[jira] [Updated] (FLINK-33799) Add e2e's for tls enabled operator

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-33799:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> Add e2e's for tls enabled operator
> --
>
> Key: FLINK-33799
> URL: https://issues.apache.org/jira/browse/FLINK-33799
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Tony Garrard
>Priority: Minor
> Fix For: kubernetes-operator-1.9.0
>
>
> It would be good to create some E2E tests to ensure a tls enabled flink 
> operator works, so that we don't break anything in the future



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34151) Integrate Karpenter resource limits into cluster capacity check

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34151:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> Integrate Karpenter resource limits into cluster capacity check
> ---
>
> Key: FLINK-34151
> URL: https://issues.apache.org/jira/browse/FLINK-34151
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.9.0
>
>
> FLINK-33771 added cluster capacity checking for Flink Autoscaling decisions. 
> The checks respect the scaling limits of the Kubernetes Cluster Autoscaler. 
> We should also support Karpenter-based resource checks, as Karpenter is the 
> preferred method of expanding the cluster size in some environments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32120) Add autoscaler config option to disable parallelism key group alignment

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-32120:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> Add autoscaler config option to disable parallelism key group alignment
> ---
>
> Key: FLINK-32120
> URL: https://issues.apache.org/jira/browse/FLINK-32120
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.9.0
>
>
> After choosing the target parallelism for a vertex, we choose a higher 
> parallelism if that parallelism leads to evenly spreading the number of key 
> groups. The number of key groups is derived from the max parallelism.
> The amount of actual skew we would introduce if we did not do the alignment 
> would usually be pretty low. In fact, the data itself can have an uneven load 
> distribution across the keys (hot keys). In this case, the key group 
> alignment is not effective.
> For experiments, we should allow disabling the key group alignment via a 
> configuration option.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32119) Revise source partition skew logic

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-32119:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> Revise source partition skew logic 
> ---
>
> Key: FLINK-32119
> URL: https://issues.apache.org/jira/browse/FLINK-32119
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.9.0
>
>
> After choosing the target parallelism for a vertex, we choose a higher 
> parallelism if that parallelism leads to evenly spreading the number of key 
> groups (=max parallelism).
> Sources don't have keyed state, so this adjustment does not make sense for 
> key groups. However, we internally limit the max parallelism of sources to 
> the number of partitions discovered. This prevents partition skew. 
> The partition skew logic currently doesn’t work correctly when there are 
> multiple topics because we use the total number of partitions discovered. 
> Using a single max parallelism doesn’t yield skew free partition distribution 
> then. However, this is also true for a single topic when the number of 
> partitions is a prime number or a not easily divisible number. 
> Hence, we should add an option to guarantee skew free partition distribution 
> which means using the total number of partitions when another configuration 
> is not possible. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34540) Tune number of task slots

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34540:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> Tune number of task slots
> -
>
> Key: FLINK-34540
> URL: https://issues.apache.org/jira/browse/FLINK-34540
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31502) Limit the number of concurrent scale operations to reduce cluster churn

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-31502:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> Limit the number of concurrent scale operations to reduce cluster churn
> ---
>
> Key: FLINK-31502
> URL: https://issues.apache.org/jira/browse/FLINK-31502
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
>
> Until we move to using the upcoming Rescale API which recycles pods, we need 
> to be mindful with how many deployments we scale at the same time because 
> each of them is going to give up all its pods and require the new number of 
> required pods. 
> This can cause churn in the cluster and temporary lead to "unallocatable" 
> pods which triggers the k8s cluster autoscaler to add more cluster nodes. 
> That is often not desirable because the actual required resources after the 
> scaling have been settled, are lower.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33789) Expose restart time as a metric

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-33789:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> Expose restart time as a metric
> ---
>
> Key: FLINK-33789
> URL: https://issues.apache.org/jira/browse/FLINK-33789
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
>
> FLINK-30593 added restart time tracking. It would be convenient to also 
> report is as a metric.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33773) Add fairness to scaling decisions

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-33773:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> Add fairness to scaling decisions
> -
>
> Key: FLINK-33773
> URL: https://issues.apache.org/jira/browse/FLINK-33773
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Deployment / Kubernetes
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.9.0
>
>
> The current scaling logic is inherently unfair. In a scenario of heavy 
> backlog, whichever pipelines come first, they will end up taking most of the 
> resources. Some kind of fairness should be introduced, for example:
> * Cap the max number of resulting pods at a % of the cluster resources
> * Allow scale up round-robin across all pipelines



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33673) SizeLimits not being set on emptyDir

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-33673:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> SizeLimits not being set on emptyDir
> 
>
> Key: FLINK-33673
> URL: https://issues.apache.org/jira/browse/FLINK-33673
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Tony Garrard
>Priority: Minor
> Fix For: kubernetes-operator-1.9.0
>
>
> The operator should set a sizeLimit on any emptyDir's it creates. See 
> [https://main.kyverno.io/policies/other/a/add-emptydir-sizelimit/add-emptydir-sizelimit/.
>  
> |https://main.kyverno.io/policies/other/a/add-emptydir-sizelimit/add-emptydir-sizelimit/]
> This issue is to set a sizeLimit. The default one in question is for 
> artifacts. My initial guess at a setting would be around 512Mb



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34539) Tune JobManager memory

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34539:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> Tune JobManager memory
> --
>
> Key: FLINK-34539
> URL: https://issues.apache.org/jira/browse/FLINK-34539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.9.0
>
>
> Adjustments similar to FLINK-34152, but simpler because we only need to 
> adjust heap memory and metaspace for the JobManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34389) JdbcAutoscalerStateStore explicitly writes update_time

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34389:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> JdbcAutoscalerStateStore explicitly writes update_time
> --
>
> Key: FLINK-34389
> URL: https://issues.apache.org/jira/browse/FLINK-34389
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Affects Versions: kubernetes-operator-1.8.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
>
> JdbcAutoscalerStateStore explicitly writes update_time instead of relying on 
> the database to update.
> Some databases doesn't support update the timestamp column automatically. For 
> example, Derby doesn't support update the update_time automatically when we 
> update any data. It's hard to do a general test during I developing the test 
> for JdbcAutoscalerEventHandler.
>  
> As the common source service, in order to support all databases well, 
> it's better to handle it inside of the service.
>  
> In order to unify the design for JdbcAutoscalerEventHandler and 
> JdbcAutoscalerStateStore, we update the design of JdbcAutoscalerStateStore in 
> this JIRA.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34152) Tune TaskManager memory

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-34152.

Release Note: TaskManager memory (heap, network, metaspace, managed) is 
optimized together with autoscaling decisions.
  Resolution: Fixed

> Tune TaskManager memory
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34619) Do not wait for scaling completion in UPGRADE state with in-place scaling

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-34619.

Release Note: Merged via 8938658ed245545e6436ff22cbb8b2fabd4047f1
  Resolution: Fixed

> Do not wait for scaling completion in UPGRADE state with in-place scaling
> -
>
> Key: FLINK-34619
> URL: https://issues.apache.org/jira/browse/FLINK-34619
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The operator currently puts the resource into upgrading state after 
> triggering in-place scaling and keeps observing until the desired parallelism 
> is reached before moving to deployed / stable. 
> However this means that due to how the adaptive scheduler works this 
> parallelism may never be reached and this is expected.
> We should simplify the logic to consider scaling "done" once the resource 
> requirements have been set correctly and then leave the rest to the adaptive 
> scheduler



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34104) Improve the ScalingReport format of autoscaling

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-34104.

Resolution: Fixed

> Improve the ScalingReport format of autoscaling
> ---
>
> Key: FLINK-34104
> URL: https://issues.apache.org/jira/browse/FLINK-34104
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Currently, the scaling report format is 
> {color:#6a8759} Vertex ID %s | Parallelism %d -> %d | Processing capacity 
> %.2f -> %.2f | Target data rate %.2f{color}
> {color:#172b4d}It has 2 disadvantages:{color}
>  # {color:#172b4d}When one job has multiple vertices, the report of all 
> vertices are mixed together without any separator{color}{color:#172b4d}, here 
> is an example:{color}
>  ** {color:#172b4d}Scaling execution enabled, begin scaling vertices: Vertex 
> ID ea632d67b7d595e5b851708ae9ad79d6 | Parallelism 2 -> 1 | Processing 
> capacity 800466.67 -> 320186.67 | Target data rate 715.10 Vertex ID 
> bc764cd8ddf7a0cff126f51c16239658 | Parallelism 2 -> 1 | Processing capacity 
> 79252.08 -> 31700.83 | Target data rate 895.93 Vertex ID 
> 0a448493b4782967b150582570326227 | Parallelism 8 -> 16 | Processing capacity 
> 716.05 -> 1141.00 | Target data rate 715.54{color}
>  ** {color:#172b4d}We can see the Vertex ID is the beginning of each vertex 
> report, it doesn't have any {color}{color:#172b4d}separator with the last 
> vertex.{color}
>  # {color:#172b4d}This format is non-standard{color}{color:#172b4d}, it's 
> hard to deserialize.{color}
>  ** {color:#172b4d}When job enables the autoscaler and disable the 
> scaling.{color}
>  ** {color:#172b4d}Flink platform maintainer wants to show the scaling report 
> in WebUI, it's helpful to using the report result for flink users.{color}
>  ** {color:#172b4d}So easy to deserialize is useful for these flink 
> platform.{color}
> h2. {color:#172b4d}Solution:{color}
>  * {color:#172b4d}Adding the {{{}} and {{}}} as the separator between 
> multiple vertices. {color}
>  * {color:#172b4d}Adding the {{AutoscalerEventUtils}} to easy deserialize the 
> {{ScalingReport}} message.{color}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34655) Autoscaler doesn't work for flink 1.15

2024-03-13 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17826015#comment-17826015
 ] 

Maximilian Michels commented on FLINK-34655:


Thanks for raising awareness for the Flink version compatibility, [~fanrui]! 
Although we've been using Flink Autoscaling with 1.16, it is true that only 
Flink 1.17 supports it out of the box.
{quote}In the short term, we only use the autoscaler to give suggestion instead 
of scaling directly. After our users think the parallelism calculation is 
reliable, they will have stronger motivation to upgrade the flink version.
{quote}
I understand the idea behind providing suggestions. However, it is difficult to 
assess the quality of Autoscaling decisions without applying them 
automatically. The reason is that suggestions become stale very quickly if the 
load pattern is not completely static. Even for static load patterns, if the 
user doesn't redeploy in a matter of minutes, the suggestions might already be 
stale again when the number of pending records increased too much. In any case, 
production load patterns are rarely static which means that autoscaling will 
inevitable trigger multiple times a day, but that is where its real power is 
unleashed. It would be great to hear about any concerns your users have for 
turning on automatic scaling. We've been operating it in production for about a 
year now.

Back to the issue here, should we think about a patch release for 1.15 / 1.16 
to add support for overriding vertex parallelism?

> Autoscaler doesn't work for flink 1.15
> --
>
> Key: FLINK-34655
> URL: https://issues.apache.org/jira/browse/FLINK-34655
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> flink-ubernetes-operator is committed to supporting the latest 4 flink minor 
> versions, and autoscaler is a part of flink-ubernetes-operator. Currently,  
> the latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18.
> But autoscaler doesn't work for  flink 1.15.
> h2. Root cause: 
> * FLINK-28310 added some properties in IOMetricsInfo in flink-1.16
> * IOMetricsInfo is a part of JobDetailsInfo
> * JobDetailsInfo is necessary for autoscaler [1]
> * flink's RestClient doesn't allow miss any property during deserializing the 
> json
> That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for 
> 1.15 jobs.
> h2. How to fix it properly?
> - [[FLINK-34655](https://issues.apache.org/jira/browse/FLINK-34655)] Copy 
> IOMetricsInfo to flink-autoscaler-standalone module
> - Removing them after 1.15 are not supported
> [1] 
> https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34152) Tune TaskManager memory

2024-03-02 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822790#comment-17822790
 ] 

Maximilian Michels edited comment on FLINK-34152 at 3/2/24 9:11 AM:


{quote}What I'm trying to say is that I didn't notice any updated configuration 
for the memory request and limit at the pod template level for the taskmanager. 
Therefore, I assume that the pod's memory allocation won't automatically adjust 
to reflect changes in the taskmanager's heap size, unless I've missed something.
{quote}
I would ask you to check again. When the tuning is applied, the pod resource 
requests/limits of the TaskManager pods will be adjusted. So changes will be 
directly reflected in terms of resource usage in Kubernetes.
{quote}Indeed, by implementing bin-packing, we can optimize resource 
utilization, which is now clearer to me. However, its management becomes more 
complex (K8s upgrade, daily node restart/eviction) for sure, especially when 
there are other application components in the same Kubernetes cluster IMO
{quote}
You are right, there is more complexity to realize Flink Autoscaling benefits 
end to end. However, there is also a great amount of resource savings and 
convenience for the user that come out of it. We have seen 60% fewer nodes 
after enabling Flink Autoscaling while maintaing the same amount of service and 
drastically decreasing the maintaince for our users who would have to adjust 
parallelism constantly to run cost-efficient. They usually did not want to do 
that and thus all jobs ran very over-provisioned.
{quote}Can you take a look on it 
https://issues.apache.org/jira/browse/FLINK-34563 and 
[https://github.com/apache/flink-kubernetes-operator/pull/787] ? And tell me if 
you think it's making sense, thanks :)
{quote}
Thank you, I'll review in the next days!


was (Author: mxm):
{quote}What I'm trying to say is that I didn't notice any updated configuration 
for the memory request and limit at the pod template level for the taskmanager. 
Therefore, I assume that the pod's memory allocation won't automatically adjust 
to reflect changes in the taskmanager's heap size, unless I've missed something.
{quote}
I would ask you to check again. When the tuning is applied, the pod resource 
requests/limits of the TaskManager pods will be adjusted. So changes will be 
directly reflected in terms of resource usage in Kubernetes.
{quote}Indeed, by implementing bin-packing, we can optimize resource 
utilization, which is now clearer to me. However, its management becomes more 
complex (K8s upgrade, daily node restart/eviction) for sure, especially when 
there are other application components in the same Kubernetes cluster IMO
{quote}
You are right, there is more complexity to realize Flink Autoscaling benefits 
end to end. However, there is also a great amount of resource savings and 
convenience for the user that come out of it. We have seen 60% fewer nodes 
after enabling Flink Autoscaling while maintaing the same amount of service and 
drastically decreasing the maintaince for our users who would have to adjust 
parallelism constantly to run cost-efficient. They usually did not want to do 
that and thus all jobs ran very over-provisioned.
{quote}Can you take a look on it 
https://issues.apache.org/jira/browse/FLINK-34563 and 
[https://github.com/apache/flink-kubernetes-operator/pull/787] ? And tell me if 
you think it's making sense, thanks :)
{quote}
Thank you, I'll review in the next days. Generally, I wonder. This idea has 
crossed my mind before. I wasn't really sure how exactly to adjust the 
parallelism to fill the TaskManagers. Adjusting only the vertex with the 
highest parallelism might be unfair to other vertices. I think spreading out 
the unused task slots to the vertices with ther lowest parallelism might be 
more beneficial for the stability. We have seen more instability with lower 
parallelisms because the metrics are less precise.

> Tune TaskManager memory
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on 

[jira] [Commented] (FLINK-34152) Tune TaskManager memory

2024-03-02 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822790#comment-17822790
 ] 

Maximilian Michels commented on FLINK-34152:


{quote}What I'm trying to say is that I didn't notice any updated configuration 
for the memory request and limit at the pod template level for the taskmanager. 
Therefore, I assume that the pod's memory allocation won't automatically adjust 
to reflect changes in the taskmanager's heap size, unless I've missed something.
{quote}
I would ask you to check again. When the tuning is applied, the pod resource 
requests/limits of the TaskManager pods will be adjusted. So changes will be 
directly reflected in terms of resource usage in Kubernetes.
{quote}Indeed, by implementing bin-packing, we can optimize resource 
utilization, which is now clearer to me. However, its management becomes more 
complex (K8s upgrade, daily node restart/eviction) for sure, especially when 
there are other application components in the same Kubernetes cluster IMO
{quote}
You are right, there is more complexity to realize Flink Autoscaling benefits 
end to end. However, there is also a great amount of resource savings and 
convenience for the user that come out of it. We have seen 60% fewer nodes 
after enabling Flink Autoscaling while maintaing the same amount of service and 
drastically decreasing the maintaince for our users who would have to adjust 
parallelism constantly to run cost-efficient. They usually did not want to do 
that and thus all jobs ran very over-provisioned.
{quote}Can you take a look on it 
https://issues.apache.org/jira/browse/FLINK-34563 and 
[https://github.com/apache/flink-kubernetes-operator/pull/787] ? And tell me if 
you think it's making sense, thanks :)
{quote}
Thank you, I'll review in the next days. Generally, I wonder. This idea has 
crossed my mind before. I wasn't really sure how exactly to adjust the 
parallelism to fill the TaskManagers. Adjusting only the vertex with the 
highest parallelism might be unfair to other vertices. I think spreading out 
the unused task slots to the vertices with ther lowest parallelism might be 
more beneficial for the stability. We have seen more instability with lower 
parallelisms because the metrics are less precise.

> Tune TaskManager memory
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34152) Tune TaskManager memory

2024-03-01 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822512#comment-17822512
 ] 

Maximilian Michels commented on FLINK-34152:


Hi [~yang]! Thanks for taking a look at the recent changes. There has been two 
more follow-up PRs since the initial PR you linked. I'm very curious to hear 
your feedback.
{quote}We may need to dynamically adjust the Kubernetes CPU and memory limits 
for both the job manager and task manager eventually, to align with the 
automatically tuned memory and CPU parameters and prevent unnecessary resource 
allocation.
{quote}
Tuning JobManager memory is still pending, but I agree that tuning only 
TaskManagers is not enough. As for tuning CPU, I think we eventually want to 
tune the number of task slots to fit them to the CPUs assigned. As for scaling 
CPU itself, that is already taken care of by the autoscaler which essentially 
scales based on the CPU usage of TaskManagers.
{quote}In our specific use-case, our Flink cluster is deployed on a dedicated 
node group with predefined CPU and memory settings, unlike a typical Kubernetes 
cluster. Consequently, this auto-tuning feature might not aid in reducing 
infrastructure costs, as billing is based on the allocated nodes behind the 
scenes.
{quote}
Autoscaling assumes some sort of Kubernetes Cluster Autoscaling to be active. 
When fewer resources are allocated, that should result in fewer nodes, but in 
practice it isn't quite that easy. It requires a bit of extra work for nodes to 
get released when fewer resources are used. The default Kubernetes scheduler 
doesn't bin-pack, but it can be reconfigured to do bin-packing as opposed to 
its default behavior to evenly spread out pods.

> Tune TaskManager memory
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune TaskManager memory

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune TaskManager memory  (was: Tune TaskManager memory of 
autoscaled jobs)

> Tune TaskManager memory
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune TaskManager memory of austoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune TaskManager memory of austoscaled jobs  (was: Tune 
TaskManager memory)

> Tune TaskManager memory of austoscaled jobs
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34540) Tune number of task slots

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-34540:
--

Assignee: (was: Maximilian Michels)

> Tune number of task slots
> -
>
> Key: FLINK-34540
> URL: https://issues.apache.org/jira/browse/FLINK-34540
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune TaskManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune TaskManager memory of autoscaled jobs  (was: Tune TaskManager 
memory of austoscaled jobs)

> Tune TaskManager memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34540) Tune number of task slots

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34540:
---
Description: (was: Adjustments similar to FLINK-34152, but simpler 
because we only need to adjust heap memory and metaspace for the JobManager.)

> Tune number of task slots
> -
>
> Key: FLINK-34540
> URL: https://issues.apache.org/jira/browse/FLINK-34540
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34539) Tune JobManager memory

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34539:
---
Summary: Tune JobManager memory  (was: Tune JobManager memory of autoscaled 
jobs)

> Tune JobManager memory
> --
>
> Key: FLINK-34539
> URL: https://issues.apache.org/jira/browse/FLINK-34539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>
> Adjustments similar to FLINK-34152, but simpler because we only need to 
> adjust heap memory and metaspace for the JobManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune TaskManager memory

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune TaskManager memory  (was: Tune TaskManager memory of 
autoscaled jobs)

> Tune TaskManager memory
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34540) Tune number of task slots

2024-02-28 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-34540:
--

 Summary: Tune number of task slots
 Key: FLINK-34540
 URL: https://issues.apache.org/jira/browse/FLINK-34540
 Project: Flink
  Issue Type: Sub-task
  Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


Adjustments similar to FLINK-34152, but simpler because we only need to adjust 
heap memory and metaspace for the JobManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Description: Umbrella issue to tackle tuning the Flink configuration as 
part of Flink Autoscaling.  (was: Umbrella issue to tackle)

> Tune Flink config of autoscaled jobs
> 
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>
> Umbrella issue to tackle tuning the Flink configuration as part of Flink 
> Autoscaling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Labels:   (was: pull-request-available)

> Tune Flink config of autoscaled jobs
> 
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>
> Umbrella issue to tackle



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Description: Umbrella issue to tackle  (was: The current autoscaling 
algorithm adjusts the parallelism of the job task vertices according to the 
processing needs. By adjusting the parallelism, we systematically scale the 
amount of CPU for a task. At the same time, we also indirectly change the 
amount of memory tasks have at their dispense. However, there are some problems 
with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

[https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit])

> Tune Flink config of autoscaled jobs
> 
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Umbrella issue to tackle



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Fix Version/s: (was: kubernetes-operator-1.8.0)

> Tune Flink config of autoscaled jobs
> 
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
>
> Umbrella issue to tackle



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Summary: Tune Flink config of autoscaled jobs  (was: Tune memory of 
autoscaled jobs)

> Tune Flink config of autoscaled jobs
> 
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> [https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34539) Tune JobManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34539:
---
Labels:   (was: pull-request-available)

> Tune JobManager memory of autoscaled jobs
> -
>
> Key: FLINK-34539
> URL: https://issues.apache.org/jira/browse/FLINK-34539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>
> Adjustments similar to FLINK-34152, but simpler because we only need to 
> adjust heap memory and metaspace for the JobManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34539) Tune JobManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34539:
---
Description: Adjustments similar to FLINK-34152, but simpler because we 
only need to adjust heap memory and metaspace for the JobManager.  (was: 
Similarly to)

> Tune JobManager memory of autoscaled jobs
> -
>
> Key: FLINK-34539
> URL: https://issues.apache.org/jira/browse/FLINK-34539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Adjustments similar to FLINK-34152, but simpler because we only need to 
> adjust heap memory and metaspace for the JobManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Description: 
The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

[https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit]

> Tune memory of autoscaled jobs
> --
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> [https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34539) Tune JobManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34539:
---
Description: Similarly to  (was: The current autoscaling algorithm adjusts 
the parallelism of the job task vertices according to the processing needs. By 
adjusting the parallelism, we systematically scale the amount of CPU for a 
task. At the same time, we also indirectly change the amount of memory tasks 
have at their dispense. However, there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit

 )

> Tune JobManager memory of autoscaled jobs
> -
>
> Key: FLINK-34539
> URL: https://issues.apache.org/jira/browse/FLINK-34539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Similarly to



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34539) Tune JobManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-34539:
--

 Summary: Tune JobManager memory of autoscaled jobs
 Key: FLINK-34539
 URL: https://issues.apache.org/jira/browse/FLINK-34539
 Project: Flink
  Issue Type: Sub-task
  Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune TaskManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Parent: FLINK-34538
Issue Type: Sub-task  (was: New Feature)

> Tune TaskManager memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34538) Tune memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-34538:
--

 Summary: Tune memory of autoscaled jobs
 Key: FLINK-34538
 URL: https://issues.apache.org/jira/browse/FLINK-34538
 Project: Flink
  Issue Type: New Feature
  Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Description: (was: The current autoscaling algorithm adjusts the 
parallelism of the job task vertices according to the processing needs. By 
adjusting the parallelism, we systematically scale the amount of CPU for a 
task. At the same time, we also indirectly change the amount of memory tasks 
have at their dispense. However, there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit

 )

> Tune memory of autoscaled jobs
> --
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune TaskManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune TaskManager memory of autoscaled jobs  (was: Tune memory of 
autoscaled jobs)

> Tune TaskManager memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune TaskManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune TaskManager memory of autoscaled jobs  (was: Tune memory of 
autoscaled jobs)

> Tune TaskManager memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune memory of autoscaled jobs  (was: Tune TaskManager memory of 
autoscaled jobs)

> Tune memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Description: 
The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit

 

  was:
The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

 


> Tune memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Description: 
The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

 

  was:
The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

A tuning algorithm could look like this:
h2. 1. Establish a memory baseline

We observe the average heap memory usage at task managers.
h2. 2. Calculate memory usage per record

The memory requirements per record can be estimated by calculating this ratio:
{noformat}
memory_per_rec = sum(heap_usage) / sum(records_processed)
{noformat}
This ratio is surprisingly constant based off empirical data.
h2. 3. Scale memory proportionally to the per-record memory needs
{noformat}
memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers 
{noformat}
A minimum memory limit needs to be added to avoid scaling down memory too much. 
The max memory per TM should be equal to the initially defined user-specified 
limit from the ResourceSpec. 
{noformat}
memory_per_tm = max(min_limit, memory_per_tm)
memory_per_tm = min(max_limit, memory_per_tm) {noformat}


> Tune memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34471) Tune network memory as part of Autoscaler Memory Tuning

2024-02-22 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819591#comment-17819591
 ] 

Maximilian Michels commented on FLINK-34471:


Yes, that is exactly what I meant: Moving from FORWARD to RESCALE. The 
workaround you described makes sense.

> Tune network memory as part of Autoscaler Memory Tuning
> ---
>
> Key: FLINK-34471
> URL: https://issues.apache.org/jira/browse/FLINK-34471
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Maximilian Michels
>Priority: Major
>
> Design doc: 
> https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34471) Tune network memory as part of Autoscaler Memory Tuning

2024-02-22 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819578#comment-17819578
 ] 

Maximilian Michels edited comment on FLINK-34471 at 2/22/24 10:18 AM:
--

Thanks! That would be very helpful – I don’t mind at all. I realized point to 
point connections might be a bit tricky when the parallelism between tasks 
changes from being equal to being different because at runtime we then switch 
to a different partitioner. So worst case we could run out of network buffers 
in this scenario. I’m curious how you want to solve this. We probably need to 
add extra buffers to account for this edge case. 


was (Author: mxm):
Thanks! That would be very helpful – I don’t mind at all. I realized point to 
point connections might be a bit tricky when the parallelism between tasks 
changes from being equal to changing because at runtime we then switch to a 
different partitioner. So worst case we could run out of network buffers in 
this scenario. I’m curious how you want to solve this. We probably need to add 
extra buffers to account for this edge case. 

> Tune network memory as part of Autoscaler Memory Tuning
> ---
>
> Key: FLINK-34471
> URL: https://issues.apache.org/jira/browse/FLINK-34471
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Maximilian Michels
>Priority: Major
>
> Design doc: 
> https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34471) Tune network memory as part of Autoscaler Memory Tuning

2024-02-22 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819578#comment-17819578
 ] 

Maximilian Michels commented on FLINK-34471:


Thanks! That would be very helpful – I don’t mind at all. I realized point to 
point connections might be a bit tricky when the parallelism between tasks 
changes from being equal to changing because at runtime we then switch to a 
different partitioner. So worst case we could run out of network buffers in 
this scenario. I’m curious how you want to solve this. We probably need to add 
extra buffers to account for this edge case. 

> Tune network memory as part of Autoscaler Memory Tuning
> ---
>
> Key: FLINK-34471
> URL: https://issues.apache.org/jira/browse/FLINK-34471
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Maximilian Michels
>Priority: Major
>
> Design doc: 
> https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34471) Tune network memory as part of Autoscaler Memory Tuning

2024-02-21 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819306#comment-17819306
 ] 

Maximilian Michels commented on FLINK-34471:


I think in addition to the fine-grained approach described in the doc, we can 
do a first implementation which simply uses 
[https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/network_mem_tuning/#network-buffer-lifecycle]
 and assumes an ALL_TO_ALL relationship. This may not optimize down to the last 
byte but still gives great savings over the default.

> Tune network memory as part of Autoscaler Memory Tuning
> ---
>
> Key: FLINK-34471
> URL: https://issues.apache.org/jira/browse/FLINK-34471
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Maximilian Michels
>Priority: Major
>
> Design doc: 
> https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34471) Tune network memory as part of Autoscaler Memory Tuning

2024-02-21 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34471:
---
Summary: Tune network memory as part of Autoscaler Memory Tuning  (was: 
Tune the network memory in Autoscaler)

> Tune network memory as part of Autoscaler Memory Tuning
> ---
>
> Key: FLINK-34471
> URL: https://issues.apache.org/jira/browse/FLINK-34471
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Maximilian Michels
>Priority: Major
>
> Design doc: 
> https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34471) Tune the network memory in Autoscaler

2024-02-21 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34471:
---
Summary: Tune the network memory in Autoscaler  (was: Tune the network 
memroy in Autoscaler)

> Tune the network memory in Autoscaler
> -
>
> Key: FLINK-34471
> URL: https://issues.apache.org/jira/browse/FLINK-34471
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Maximilian Michels
>Priority: Major
>
> Design doc: 
> https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34471) Tune the network memroy in Autoscaler

2024-02-21 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819233#comment-17819233
 ] 

Maximilian Michels commented on FLINK-34471:


Thanks Rui!

> Tune the network memroy in Autoscaler
> -
>
> Key: FLINK-34471
> URL: https://issues.apache.org/jira/browse/FLINK-34471
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Maximilian Michels
>Priority: Major
>
> Design doc: 
> https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34471) Tune the network memroy in Autoscaler

2024-02-21 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-34471:
--

Assignee: Maximilian Michels

> Tune the network memroy in Autoscaler
> -
>
> Key: FLINK-34471
> URL: https://issues.apache.org/jira/browse/FLINK-34471
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Maximilian Michels
>Priority: Major
>
> Design doc: 
> https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33092) Improve the resource-stabilization-timeout mechanism when rescale a job for Adaptive Scheduler

2024-01-31 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812778#comment-17812778
 ] 

Maximilian Michels commented on FLINK-33092:


+1 waiting on resources in the Executing state.

I think we need to just change the ScalingControler to delay triggering the 
actual rescale process: 
[https://github.com/apache/flink/blob/cb9e220c2291088459f0281aa8e8e8584436a9b2/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/scalingpolicy/RescalingController.java#L37]

Right now, it triggers immediately on parallelism change. [~dmvk] can probably 
answer this.

> Improve the resource-stabilization-timeout mechanism when rescale a job for 
> Adaptive Scheduler
> --
>
> Key: FLINK-33092
> URL: https://issues.apache.org/jira/browse/FLINK-33092
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Attachments: image-2023-09-15-14-43-35-104.png
>
>
> !image-2023-09-15-14-43-35-104.png|width=916,height=647!
> h1. 1. Propose
> The above is the state transition graph when rescale a job in Adaptive 
> Scheduler.
> In brief, when we trigger a rescale, the job will wait 
> _*resource-stabilization-timeout*_ in WaitingForResources State when it has 
> sufficient resources and it doesn't have the desired resource.
> If the _*resource-stabilization-timeout mechanism*_ is moved into the 
> Executing State, the rescale downtime will be significantly reduced.
> h1. 2. Why the downtime is long?
> Currently, when rescale a job:
>  * The Executing will transition to Restarting
>  * The Restarting will cancel this job first.
>  * The Restarting will transition to WaitingForResources after the whole job 
> is terminal.
>  * When this job has sufficient resources and it doesn't have the desired 
> resource, the WaitingForResources needs to wait  
> _*resource-stabilization-timeout*_ .
>  * WaitingForResources will transition to CreatingExecutionGraph after  
> resource-stabilization-timeout.
> The problem is the job isn't running during the 
> resource-stabilization-timeout phase.
> h1. 3. How to reduce the downtime?
> We can move the _*resource-stabilization-timeout mechanism*_ into the 
> Executing State when trigger a rescale. It means:
>  * When this job has desired resources, the Executing can rescale directly.
>  * When this job has sufficient resources and it doesn't have the desired 
> resource, we can rescale after _*resource-stabilization-timeout.*_
>  * The WaitingForResources will ignore the resource-stabilization-timeout 
> after this improvement.
> The resource-stabilization-timeout works before cancel job, so the rescale 
> downtime will be significantly reduced.
>  
> Note: the resource-stabilization-timeout still works in WaitingForResources 
> when start a job. It's just changed when rescale a job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34266) Output ratios should be computed over the whole metric window instead of averaged

2024-01-29 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811988#comment-17811988
 ] 

Maximilian Michels commented on FLINK-34266:


Yes, using the total sum and substracting the start from the end observation is 
the way to go for maximum precision. 

> Output ratios should be computed over the whole metric window instead of 
> averaged
> -
>
> Key: FLINK-34266
> URL: https://issues.apache.org/jira/browse/FLINK-34266
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Gyula Fora
>Priority: Critical
>
> Currently Output ratios are computed during metric collection based on the 
> current in/out metrics an stored as part of the collected metrics.
> During evaluation the output ratios previously computed are then averaged 
> together in the metric window. This however leads to incorrect computation 
> due to the nature of the computation and averaging.
> Example:
> Let's look at a window operator that simply sorts and re-emits events in 
> windows. During the window collection phase, output ratio will be computed 
> and stored as 0. During the window computation the output ratio will be 
> last_input_rate / window_size.  Depending on the last input rate observation 
> this can be off when averaged into any direction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34266) Output ratios should be computed over the whole metric window instead of averaged

2024-01-29 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811921#comment-17811921
 ] 

Maximilian Michels commented on FLINK-34266:


The way I understand the code is that for every observation, we will store the 
total output rate of every vertex. During metric window evaluation, we will 
average all of those. That is in line with how all the code works.

I agree 100% that all metrics should be observed over the entire metric window. 
So rates should be computed by measuring the number of records produced at the 
start and at the end up the window, then subtracting them from each other.

This request seems analogue to FLINK-34213 but for rates instead of busy time. 
Is that fair to say?

> Output ratios should be computed over the whole metric window instead of 
> averaged
> -
>
> Key: FLINK-34266
> URL: https://issues.apache.org/jira/browse/FLINK-34266
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Gyula Fora
>Priority: Critical
>
> Currently Output ratios are computed during metric collection based on the 
> current in/out metrics an stored as part of the collected metrics.
> During evaluation the output ratios previously computed are then averaged 
> together in the metric window. This however leads to incorrect computation 
> due to the nature of the computation and averaging.
> Example:
> Let's look at a window operator that simply sorts and re-emits events in 
> windows. During the window collection phase, output ratio will be computed 
> and stored as 0. During the window computation the output ratio will be 
> last_input_rate / window_size.  Depending on the last input rate observation 
> this can be off when averaged into any direction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34213) Consider using accumulated busy time instead of busyMsPerSecond

2024-01-23 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17810006#comment-17810006
 ] 

Maximilian Michels commented on FLINK-34213:


If we had to query metrics per vertex, that would be too expensive, but it 
seems like that is not necessary. Here is an exemplary REST API response to the 
{{/jobs/}} endpoint:

{noformat}
{
"jid": "b4f918c2a0312de9fe7369a7db093e96",
"name": "-",
"isStoppable": false,
"state": "RUNNING",
"start-time": 1705094021727,
"end-time": -1,
"duration": 928985186,
"maxParallelism": 1,
"now": 1706023006913,
"timestamps": {
"SUSPENDED": 0,
"RUNNING": 1705094036134,
"FAILING": 0,
"CANCELED": 0,
"CANCELLING": 0,
"CREATED": 1705094035034,
"INITIALIZING": 1705094021727,
"FAILED": 0,
"RESTARTING": 0,
"RECONCILING": 0,
"FINISHED": 0
},
"vertices": [
{
"id": "db1f263dc155338dc2a9622a2e06d115",
"name": "",
"maxParallelism": 1,
"parallelism": 18,
"status": "RUNNING",
"start-time": 1705094037437,
"end-time": -1,
"duration": 928969476,
"tasks": {
"CANCELED": 0,
"DEPLOYING": 0,
"CANCELING": 0,
"RECONCILING": 0,
"FINISHED": 0,
"SCHEDULED": 0,
"CREATED": 0,
"INITIALIZING": 0,
"FAILED": 0,
"RUNNING": 18
},
"metrics": {
"read-bytes": 0,
"read-bytes-complete": true,
"write-bytes": 2907138853415272,
"write-bytes-complete": true,
"read-records": 0,
"read-records-complete": true,
"write-records": 229589536334,
"write-records-complete": true,
"accumulated-backpressured-time": 1533744940,
"accumulated-idle-time": 10026044858,
"accumulated-busy-time": 5161601268
}
},
   ...
]
}
{noformat}

Note the accumulated backpressure/idle time.

> Consider using accumulated busy time instead of busyMsPerSecond
> ---
>
> Key: FLINK-34213
> URL: https://issues.apache.org/jira/browse/FLINK-34213
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Priority: Minor
>
> We might achieve much better accuracy if we used the accumulated busy time 
> metrics from Flink, instead of the momentarily collected ones.
> We would use the diff between the last accumulated and the current 
> accumulated busy time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34213) Consider using accumulated busy time instead of busyMsPerSecond

2024-01-23 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-34213:
--

 Summary: Consider using accumulated busy time instead of 
busyMsPerSecond
 Key: FLINK-34213
 URL: https://issues.apache.org/jira/browse/FLINK-34213
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels


We might achieve much better accuracy if we used the accumulated busy time 
metrics from Flink, instead of the momentarily collected ones.

We would use the diff between the last accumulated and the current accumulated 
busy time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune memory of autoscaled jobs

2024-01-22 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Description: 
The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

A tuning algorithm could look like this:
h2. 1. Establish a memory baseline

We observe the average heap memory usage at task managers.
h2. 2. Calculate memory usage per record

The memory requirements per record can be estimated by calculating this ratio:
{noformat}
memory_per_rec = sum(heap_usage) / sum(records_processed)
{noformat}
This ratio is surprisingly constant based off empirical data.
h2. 3. Scale memory proportionally to the per-record memory needs
{noformat}
memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers 
{noformat}
A minimum memory limit needs to be added to avoid scaling down memory too much. 
The max memory per TM should be equal to the initially defined user-specified 
limit from the ResourceSpec. 
{noformat}
memory_per_tm = max(min_limit, memory_per_tm)
memory_per_tm = min(max_limit, memory_per_tm) {noformat}

  was:
The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory then we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

A tuning algorithm could look like this:
h2. 1. Establish a memory baseline

We observe the average heap memory usage at task managers.
h2. 2. Calculate memory usage per record

The memory requirements per record can be estimated by calculating this ratio:
{noformat}
memory_per_rec = sum(heap_usage) / sum(records_processed)
{noformat}
This ratio is surprisingly constant based off empirical data.
h2. 3. Scale memory proportionally to the per-record memory needs
{noformat}
memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers 
{noformat}
A minimum memory limit needs to be added to avoid scaling down memory too much. 
The max memory per TM should be equal to the initially defined user-specified 
limit from the ResourceSpec. 
{noformat}
memory_per_tm = max(min_limit, memory_per_tm)
memory_per_tm = min(max_limit, memory_per_tm) {noformat}


> Tune memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> 

[jira] [Created] (FLINK-34152) Tune memory of autoscaled jobs

2024-01-18 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-34152:
--

 Summary: Tune memory of autoscaled jobs
 Key: FLINK-34152
 URL: https://issues.apache.org/jira/browse/FLINK-34152
 Project: Flink
  Issue Type: New Feature
  Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory then we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

A tuning algorithm could look like this:
h2. 1. Establish a memory baseline

We observe the average heap memory usage at task managers.
h2. 2. Calculate memory usage per record

The memory requirements per record can be estimated by calculating this ratio:
{noformat}
memory_per_rec = sum(heap_usage) / sum(records_processed)
{noformat}
This ratio is surprisingly constant based off empirical data.
h2. 3. Scale memory proportionally to the per-record memory needs
{noformat}
memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers 
{noformat}
A minimum memory limit needs to be added to avoid scaling down memory too much. 
The max memory per TM should be equal to the initially defined user-specified 
limit from the ResourceSpec. 
{noformat}
memory_per_tm = max(min_limit, memory_per_tm)
memory_per_tm = min(max_limit, memory_per_tm) {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune heap memory of autoscaled jobs

2024-01-18 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune heap memory of autoscaled jobs  (was: Tune memory of 
autoscaled jobs)

> Tune heap memory of autoscaled jobs
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory then we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> A tuning algorithm could look like this:
> h2. 1. Establish a memory baseline
> We observe the average heap memory usage at task managers.
> h2. 2. Calculate memory usage per record
> The memory requirements per record can be estimated by calculating this ratio:
> {noformat}
> memory_per_rec = sum(heap_usage) / sum(records_processed)
> {noformat}
> This ratio is surprisingly constant based off empirical data.
> h2. 3. Scale memory proportionally to the per-record memory needs
> {noformat}
> memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers 
> {noformat}
> A minimum memory limit needs to be added to avoid scaling down memory too 
> much. The max memory per TM should be equal to the initially defined 
> user-specified limit from the ResourceSpec. 
> {noformat}
> memory_per_tm = max(min_limit, memory_per_tm)
> memory_per_tm = min(max_limit, memory_per_tm) {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune memory of autoscaled jobs

2024-01-18 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune memory of autoscaled jobs  (was: Tune heap memory of 
autoscaled jobs)

> Tune memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory then we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> A tuning algorithm could look like this:
> h2. 1. Establish a memory baseline
> We observe the average heap memory usage at task managers.
> h2. 2. Calculate memory usage per record
> The memory requirements per record can be estimated by calculating this ratio:
> {noformat}
> memory_per_rec = sum(heap_usage) / sum(records_processed)
> {noformat}
> This ratio is surprisingly constant based off empirical data.
> h2. 3. Scale memory proportionally to the per-record memory needs
> {noformat}
> memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers 
> {noformat}
> A minimum memory limit needs to be added to avoid scaling down memory too 
> much. The max memory per TM should be equal to the initially defined 
> user-specified limit from the ResourceSpec. 
> {noformat}
> memory_per_tm = max(min_limit, memory_per_tm)
> memory_per_tm = min(max_limit, memory_per_tm) {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34151) Integrate Karpenter resource limits into cluster capacity check

2024-01-18 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-34151:
--

 Summary: Integrate Karpenter resource limits into cluster capacity 
check
 Key: FLINK-34151
 URL: https://issues.apache.org/jira/browse/FLINK-34151
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


FLINK-33771 added cluster capacity checking for Flink Autoscaling decisions. 
The checks respect the scaling limits of the Kubernetes Cluster Autoscaler. 

We should also support Karpenter-based resource checks, as Karpenter is the 
preferred method of expanding the cluster size in some environments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33771) Add cluster capacity awareness to Autoscaler

2024-01-18 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-33771.

Resolution: Fixed

> Add cluster capacity awareness to Autoscaler
> 
>
> Key: FLINK-33771
> URL: https://issues.apache.org/jira/browse/FLINK-33771
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> To avoid starvation of pipelines when the Kubernetes cluster runs out of 
> resources, new scaling attempts should be stopped. 
> The Rescaling API will probably prevent most of this cases but we will also 
> have to double-check there. 
> For the config-based parallelism overrides, we have pretty good heuristics in 
> the operator to check in Kubernetes for the approximate number of free 
> cluster resources, the max cluster scaleup for the Cluster Autoscaler, and 
> the required scaling costs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31977) If scaling.effectiveness.detection.enabled is false, the call to the detectIneffectiveScaleUp() function is unnecessary

2024-01-11 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805650#comment-17805650
 ] 

Maximilian Michels commented on FLINK-31977:


I think this is related to FLINK-33993. The name of the configuration option is 
a bit misleading, as effectiveness detection is always on but scalings are only 
blocked when the option is set to {{true}}.

> If scaling.effectiveness.detection.enabled is false, the call to the 
> detectIneffectiveScaleUp() function is unnecessary
> ---
>
> Key: FLINK-31977
> URL: https://issues.apache.org/jira/browse/FLINK-31977
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Affects Versions: 1.17.0
>Reporter: Tan Kim
>Priority: Minor
>
> The code below is a function to detect inefficient scaleups.
> It returns a result if the value of SCALING_EFFECTIVENESS_DETECTION_ENABLED 
> (scaling.effectiveness.detection.enabled) is true after all the necessary 
> computations for detection, but this is an unnecessary computation.
> {code:java}
> JobVertexScaler.java #175
> private boolean detectIneffectiveScaleUp(
> AbstractFlinkResource resource,
> JobVertexID vertex,
> Configuration conf,
> Map evaluatedMetrics,
> ScalingSummary lastSummary) {
> double lastProcRate = 
> lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); // 
> 22569.315633422066
> double lastExpectedProcRate =
> 
> lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(); // 
> 37340.0
> var currentProcRate = 
> evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
> // To judge the effectiveness of the scale up operation we compute how 
> much of the expected
> // increase actually happened. For example if we expect a 100 increase in 
> proc rate and only
> // got an increase of 10 we only accomplished 10% of the desired 
> increase. If this number is
> // below the threshold, we mark the scaling ineffective.
> double expectedIncrease = lastExpectedProcRate - lastProcRate;
> double actualIncrease = currentProcRate - lastProcRate;
> boolean withinEffectiveThreshold =
> (actualIncrease / expectedIncrease)
> >= 
> conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD);
> if (withinEffectiveThreshold) {
> return false;
> }
> var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);
> eventRecorder.triggerEvent(
> resource,
> EventRecorder.Type.Normal,
> EventRecorder.Reason.IneffectiveScaling,
> EventRecorder.Component.Operator,
> message);
> if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
> LOG.info(message);
> return true;
> } else {
> return false;
> }
> } {code}
> In the call to the detectIneffectiveScaleUp function, I would suggest 
> checking SCALING_EFFECTIVENESS_DETECTION_ENABLED first, as follows.
> {code:java}
> JobVertexScaler.java #150
> if (currentParallelism == lastSummary.getNewParallelism() && 
> lastSummary.isScaledUp()) {
>     if (scaledUp) {
>         
> if(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
>             return detectIneffectiveScaleUp(resource, vertex, conf, 
> evaluatedMetrics, lastSummary);
>         } else {
>             return true;
>         }
>     } else {
>         return detectImmediateScaleDownAfterScaleUp(vertex, conf, 
> lastScalingTs);
>     }
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33993) Ineffective scaling detection events are misleading

2024-01-05 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels closed FLINK-33993.
--
Resolution: Fixed

> Ineffective scaling detection events are misleading
> ---
>
> Key: FLINK-33993
> URL: https://issues.apache.org/jira/browse/FLINK-33993
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> When the ineffective scaling decision feature is turned off, events are 
> regenerated which look like this:
> {noformat}
> Skipping further scale up after ineffective previous scale up for 
> 65c763af14a952c064c400d516c25529
> {noformat}
> This is misleading because no action will be taken. It is fair to inform 
> users about ineffective scale up even when the feature is disabled but a 
> different message should be printed to convey that no action will be taken.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33993) Ineffective scaling detection events are misleading

2024-01-04 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-33993:
--

 Summary: Ineffective scaling detection events are misleading
 Key: FLINK-33993
 URL: https://issues.apache.org/jira/browse/FLINK-33993
 Project: Flink
  Issue Type: Bug
  Components: Autoscaler, Kubernetes Operator
Affects Versions: kubernetes-operator-1.7.0
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


When the ineffective scaling decision feature is turned off, events are 
regenerated which look like this:

{noformat}
Skipping further scale up after ineffective previous scale up for 
65c763af14a952c064c400d516c25529
{noformat}

This is misleading because no action will be taken. It is fair to inform users 
about ineffective scale up even when the feature is disabled but a different 
message should be printed to convey that no action will be taken.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space

2024-01-02 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801843#comment-17801843
 ] 

Maximilian Michels commented on FLINK-33940:


[~Zhanghao Chen] Even though the factor only affects high parallelism operators 
> 840, I wonder whether we need to leave more room for scaleup. But I don't 
have a strong opinion.

{quote}
IIUC, when the parallelism of one job is very small(it's 1 or 2) and the max 
parallelism is 1024, one subtask will have 1024 keyGroups. From state backend 
side, too many key groups may effect the performance. (This is my concern to 
change it by default in Flink Community.)
{quote}

[~fanrui] I think we need to find out how big the performance impact actually 
is when jumping from 128 to 840 key groups. But 128 may just have been a very 
conservative number.

> Update the auto-derivation rule of max parallelism for enlarged upscaling 
> space
> ---
>
> Key: FLINK-33940
> URL: https://issues.apache.org/jira/browse/FLINK-33940
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>
> *Background*
> The choice of the max parallelism of an stateful operator is important as it 
> limits the upper bound of the parallelism of the opeartor while it can also 
> add extra overhead when being set too large. Currently, the max parallelism 
> of an opeartor is either fixed to a value specified by API core / pipeline 
> option or auto-derived with the following rules:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}}
> *Problem*
> Recently, the elasticity of Flink jobs is becoming more and more valued by 
> users. The current auto-derived max parallelism was introduced a time time 
> ago and only allows the operator parallelism to be roughly doubled, which is 
> not desired for elasticity. Setting an max parallelism manually may not be 
> desired as well: users may not have the sufficient expertise to select a good 
> max-parallelism value.
> *Proposal*
> Update the auto-derivation rule of max parallelism to derive larger max 
> parallelism for better elasticity experience out of the box. A candidate is 
> as follows:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), 
> 32767)}}
> Looking forward to your opinions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space

2024-01-02 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801790#comment-17801790
 ] 

Maximilian Michels commented on FLINK-33940:


[~Zhanghao Chen] Thank you for the proposal. I agree with using highly 
composite numbers, as this will provide more flexibility to the autoscaler. I'm 
not sure about the {{operatorParallelism * 5}}. What is the rational for 
selecting this factor? Why not {{*10}}? 

> Update the auto-derivation rule of max parallelism for enlarged upscaling 
> space
> ---
>
> Key: FLINK-33940
> URL: https://issues.apache.org/jira/browse/FLINK-33940
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>
> *Background*
> The choice of the max parallelism of an stateful operator is important as it 
> limits the upper bound of the parallelism of the opeartor while it can also 
> add extra overhead when being set too large. Currently, the max parallelism 
> of an opeartor is either fixed to a value specified by API core / pipeline 
> option or auto-derived with the following rules:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}}
> *Problem*
> Recently, the elasticity of Flink jobs is becoming more and more valued by 
> users. The current auto-derived max parallelism was introduced a time time 
> ago and only allows the operator parallelism to be roughly doubled, which is 
> not desired for elasticity. Setting an max parallelism manually may not be 
> desired as well: users may not have the sufficient expertise to select a good 
> max-parallelism value.
> *Proposal*
> Update the auto-derivation rule of max parallelism to derive larger max 
> parallelism for better elasticity experience out of the box. A candidate is 
> as follows:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), 
> 32767)}}
> Looking forward to your opinions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33795) Add new config to forbid autoscaling in certain periods of a day

2024-01-02 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-33795.

Fix Version/s: kubernetes-operator-1.8.0
 Assignee: yonghua jian
   Resolution: Fixed

> Add new config to forbid autoscaling in certain periods of a day
> 
>
> Key: FLINK-33795
> URL: https://issues.apache.org/jira/browse/FLINK-33795
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yonghua jian
>Assignee: yonghua jian
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Add new config to forbid autoscaling in certain periods of a day so that we 
> keep flink job unaffected by autoscaling's job restart behavior during this 
> periods



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33917) IllegalArgumentException: hostname can't be null

2024-01-02 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-33917.

Fix Version/s: kubernetes-operator-1.8.0
 Assignee: Tom
   Resolution: Fixed

> IllegalArgumentException: hostname can't be null
> 
>
> Key: FLINK-33917
> URL: https://issues.apache.org/jira/browse/FLINK-33917
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Tom
>Assignee: Tom
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> In certain scenarios, if the hostname contains certain characters it will 
> throw an exception when it tries to initialize the `InetSocketAddress`
>  
> {code:java}
> java.lang.IllegalArgumentException: hostname can't be null    at 
> java.base/java.net.InetSocketAddress.checkHost(InetSocketAddress.java:149)
>     at 
> java.base/java.net.InetSocketAddress.(InetSocketAddress.java:216) {code}
>  
> [https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L236]
>  
> {code:java}
>        @Override
>     public boolean isJobManagerPortReady(Configuration config) {
>         final URI uri;
>         try (var clusterClient = getClusterClient(config)) {
>             uri = URI.create(clusterClient.getWebInterfaceURL());
>         } catch (Exception ex) {
>             throw new FlinkRuntimeException(ex);
>         }
>         SocketAddress socketAddress = new InetSocketAddress(uri.getHost(), 
> uri.getPort());
>         Socket socket = new Socket();
>         try {
>             socket.connect(socketAddress, 1000);
>             socket.close();
>             return true;
>         } catch (IOException e) {
>             return false;
>         }
>     }
>   {code}
>  
> Here's a simple test to reproduce
>  
> URL
> {code:java}
> @ParameterizedTest
> @ValueSource(
> strings = {"http://127.0.0.1:8081;, "http://123-dev:8081;, 
> "http://dev-test.abc:8081;, "http://dev-test.1a:8081;, 
> "http://dev-test.abc01:8081"})
> void testURLAddresses(String inputAddress) {
> assertDoesNotThrow(
> () -> {
> final URL url = new URL(inputAddress);
> new InetSocketAddress(url.getHost(), url.getPort());
> });
> } {code}
>  
> URI
>  
> {code:java}
> @ParameterizedTest
> @ValueSource(
> strings = {"http://127.0.0.1:8081;, "http://123-dev:8081;, 
> "http://dev-test.abc:8081;, "http://dev-test.1a:8081;, 
> "http://dev-test.abc01:8081"})
> void testURIAddresses(String inputAddress) {
> assertDoesNotThrow(
> () -> {
> final URI uri = new URI(inputAddress);
> new InetSocketAddress(uri.getHost(), uri.getPort());
> });
> }  {code}
>  
> All test cases past except for  "http://dev-test.1a:8081; which is a valid 
> flink host url, but not a valid URI
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33917) IllegalArgumentException: hostname can't be null

2023-12-22 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799701#comment-17799701
 ] 

Maximilian Michels commented on FLINK-33917:


{{new URI("123-test").getHost()}} returns null. 

I’m not 100% sure this is a JDK bug. There may be some ambiguity when resolving 
URIs without all spec parts. But let’s see what upstream says. 

> IllegalArgumentException: hostname can't be null
> 
>
> Key: FLINK-33917
> URL: https://issues.apache.org/jira/browse/FLINK-33917
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Tom
>Priority: Major
>  Labels: pull-request-available
>
> In certain scenarios, if the hostname contains certain characters it will 
> throw an exception when it tries to initialize the `InetSocketAddress`
>  
> {code:java}
> java.lang.IllegalArgumentException: hostname can't be null    at 
> java.base/java.net.InetSocketAddress.checkHost(InetSocketAddress.java:149)
>     at 
> java.base/java.net.InetSocketAddress.(InetSocketAddress.java:216) {code}
>  
> [https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L236]
>  
> {code:java}
>        @Override
>     public boolean isJobManagerPortReady(Configuration config) {
>         final URI uri;
>         try (var clusterClient = getClusterClient(config)) {
>             uri = URI.create(clusterClient.getWebInterfaceURL());
>         } catch (Exception ex) {
>             throw new FlinkRuntimeException(ex);
>         }
>         SocketAddress socketAddress = new InetSocketAddress(uri.getHost(), 
> uri.getPort());
>         Socket socket = new Socket();
>         try {
>             socket.connect(socketAddress, 1000);
>             socket.close();
>             return true;
>         } catch (IOException e) {
>             return false;
>         }
>     }
>   {code}
>  
> Here's a simple test to reproduce
>  
> URL
> {code:java}
> @ParameterizedTest
> @ValueSource(
> strings = {"http://127.0.0.1:8081;, "http://123-dev:8081;, 
> "http://dev-test.abc:8081;, "http://dev-test.1a:8081;, 
> "http://dev-test.abc01:8081"})
> void testURLAddresses(String inputAddress) {
> assertDoesNotThrow(
> () -> {
> final URL url = new URL(inputAddress);
> new InetSocketAddress(url.getHost(), url.getPort());
> });
> } {code}
>  
> URI
>  
> {code:java}
> @ParameterizedTest
> @ValueSource(
> strings = {"http://127.0.0.1:8081;, "http://123-dev:8081;, 
> "http://dev-test.abc:8081;, "http://dev-test.1a:8081;, 
> "http://dev-test.abc01:8081"})
> void testURIAddresses(String inputAddress) {
> assertDoesNotThrow(
> () -> {
> final URI uri = new URI(inputAddress);
> new InetSocketAddress(uri.getHost(), uri.getPort());
> });
> }  {code}
>  
> All test cases past except for  "http://dev-test.1a:8081; which is a valid 
> flink host url, but not a valid URI
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33917) IllegalArgumentException: hostname can't be null

2023-12-21 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799352#comment-17799352
 ] 

Maximilian Michels commented on FLINK-33917:


The description doesn’t describe under which circumstances the host name can be 
parsed as null. One example is {{new URI("123-test")}} which will return a null 
host name because the string is parsed as a URI path. Flink itself returns a 
stringified URL object. So using URL instead works fine. 

> IllegalArgumentException: hostname can't be null
> 
>
> Key: FLINK-33917
> URL: https://issues.apache.org/jira/browse/FLINK-33917
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Tom
>Priority: Major
>  Labels: pull-request-available
>
> In certain scenarios, if the hostname contains certain characters it will 
> throw an exception when it tries to initialize the `InetSocketAddress`
>  
> {code:java}
> java.lang.IllegalArgumentException: hostname can't be null    at 
> java.base/java.net.InetSocketAddress.checkHost(InetSocketAddress.java:149)
>     at 
> java.base/java.net.InetSocketAddress.(InetSocketAddress.java:216) {code}
>  
> [https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L236]
>  
> {code:java}
>        @Override
>     public boolean isJobManagerPortReady(Configuration config) {
>         final URI uri;
>         try (var clusterClient = getClusterClient(config)) {
>             uri = URI.create(clusterClient.getWebInterfaceURL());
>         } catch (Exception ex) {
>             throw new FlinkRuntimeException(ex);
>         }
>         SocketAddress socketAddress = new InetSocketAddress(uri.getHost(), 
> uri.getPort());
>         Socket socket = new Socket();
>         try {
>             socket.connect(socketAddress, 1000);
>             socket.close();
>             return true;
>         } catch (IOException e) {
>             return false;
>         }
>     }
>   {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33770) Autoscaler logs are full of deprecated key warnings

2023-12-11 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-33770.

Resolution: Fixed

> Autoscaler logs are full of deprecated key warnings
> ---
>
> Key: FLINK-33770
> URL: https://issues.apache.org/jira/browse/FLINK-33770
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> We moved all autoscaler configuration from 
> {{kubernetes.operator.job.autoscaler.*}} to {{job.autoscaler.*}}. 
> With the latest release, the logs are full with logs like this:
> {noformat}
> level:  WARN 
> logger:  org.apache.flink.configuration.Configuration 
> message:  Config uses deprecated configuration key 
> 'kubernetes.operator.job.autoscaler.target.utilization' instead of proper key 
> 'job.autoscaler.target.utilization' 
> {noformat}
> The reason is that the configuration is loaded for every reconciliation.
> This configuration is already widely adopted across hundreds of pipelines. I 
> propose to remove the deprecation from the config keys and make them 
> "fallback" keys instead which removes the deprecation warning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31400) Add autoscaler integration for Iceberg source

2023-12-07 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-31400:
--

Assignee: Mason Chen

> Add autoscaler integration for Iceberg source
> -
>
> Key: FLINK-31400
> URL: https://issues.apache.org/jira/browse/FLINK-31400
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Mason Chen
>Priority: Major
>
> A very critical part in the scaling algorithm is setting the source 
> processing rate correctly such that the Flink pipeline can keep up with the 
> ingestion rate. The autoscaler does that by looking at the {{pendingRecords}} 
> Flink source metric. Even if that metric is not available, the source can 
> still be sized according to the busyTimeMsPerSecond metric, but there will be 
> no backlog information available. For Kafka, the autoscaler also determines 
> the number of partitions to avoid scaling higher than the maximum number of 
> partitions.
> In order to support a wider range of use cases, we should investigate an 
> integration with the Iceberg source. As far as I know, it does not expose the 
> pedingRecords metric, nor does the autoscaler know about other constraints, 
> e.g. the maximum number of open files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31502) Limit the number of concurrent scale operations to reduce cluster churn

2023-12-07 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-31502:
---
Fix Version/s: kubernetes-operator-1.8.0
   (was: kubernetes-operator-1.5.0)

> Limit the number of concurrent scale operations to reduce cluster churn
> ---
>
> Key: FLINK-31502
> URL: https://issues.apache.org/jira/browse/FLINK-31502
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Until we move to using the upcoming Rescale API which recycles pods, we need 
> to be mindful with how many deployments we scale at the same time because 
> each of them is going to give up all its pods and require the new number of 
> required pods. 
> This can cause churn in the cluster and temporary lead to "unallocatable" 
> pods which triggers the k8s cluster autoscaler to add more cluster nodes. 
> That is often not desirable because the actual required resources after the 
> scaling have been settled, are lower.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33771) Add cluster capacity awareness to Autoscaler

2023-12-07 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-33771:
---
Description: 
To avoid starvation of pipelines when the Kubernetes cluster runs out of 
resources, new scaling attempts should be stopped. 

The Rescaling API will probably prevent most of this cases but we will also 
have to double-check there. 

For the config-based parallelism overrides, we have pretty good heuristics in 
the operator to check in Kubernetes for the approximate number of free cluster 
resources, the max cluster scaleup for the Cluster Autoscaler, and the required 
scaling costs.

  was:
To avoid starvation of pipelines when the Kubernetes cluster runs out of 
resources, new scaling attempts should be stopped. 

The Rescaling API will probably prevent most of this cases but we will also 
have to double-check there. 

For the config-based parallelism overrides, we have pretty good heuristics in 
the operator to check in Kubernetes for the approximate number of free cluster 
resources and the required scaling costs.


> Add cluster capacity awareness to Autoscaler
> 
>
> Key: FLINK-33771
> URL: https://issues.apache.org/jira/browse/FLINK-33771
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>
> To avoid starvation of pipelines when the Kubernetes cluster runs out of 
> resources, new scaling attempts should be stopped. 
> The Rescaling API will probably prevent most of this cases but we will also 
> have to double-check there. 
> For the config-based parallelism overrides, we have pretty good heuristics in 
> the operator to check in Kubernetes for the approximate number of free 
> cluster resources, the max cluster scaleup for the Cluster Autoscaler, and 
> the required scaling costs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33773) Add fairness to scaling decisions

2023-12-07 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-33773:
--

 Summary: Add fairness to scaling decisions
 Key: FLINK-33773
 URL: https://issues.apache.org/jira/browse/FLINK-33773
 Project: Flink
  Issue Type: New Feature
  Components: Autoscaler, Deployment / Kubernetes
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


The current scaling logic is inherently unfair. In a scenario of heavy backlog, 
whichever pipelines come first, they will end up taking most of the resources. 
Some kind of fairness should be introduced, for example:

* Cap the max number of resulting pods at a % of the cluster resources
* Allow scale up round-robin across all pipelines



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33771) Add cluster capacity awareness to Autoscaler

2023-12-07 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-33771:
--

 Summary: Add cluster capacity awareness to Autoscaler
 Key: FLINK-33771
 URL: https://issues.apache.org/jira/browse/FLINK-33771
 Project: Flink
  Issue Type: New Feature
  Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


To avoid starvation of pipelines when the Kubernetes cluster runs out of 
resources, new scaling attempts should be stopped. 

The Rescaling API will probably prevent most of this cases but we will also 
have to double-check there. 

For the config-based parallelism overrides, we have pretty good heuristics in 
the operator to check in Kubernetes for the approximate number of free cluster 
resources and the required scaling costs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-31502) Limit the number of concurrent scale operations to reduce cluster churn

2023-12-07 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reopened FLINK-31502:


Reopening because this is an actual issue.

> Limit the number of concurrent scale operations to reduce cluster churn
> ---
>
> Key: FLINK-31502
> URL: https://issues.apache.org/jira/browse/FLINK-31502
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.5.0
>
>
> Until we move to using the upcoming Rescale API which recycles pods, we need 
> to be mindful with how many deployments we scale at the same time because 
> each of them is going to give up all its pods and require the new number of 
> required pods. 
> This can cause churn in the cluster and temporary lead to "unallocatable" 
> pods which triggers the k8s cluster autoscaler to add more cluster nodes. 
> That is often not desirable because the actual required resources after the 
> scaling have been settled, are lower.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33770) Autoscaler logs are full of deprecated key warnings

2023-12-07 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-33770:
--

 Summary: Autoscaler logs are full of deprecated key warnings
 Key: FLINK-33770
 URL: https://issues.apache.org/jira/browse/FLINK-33770
 Project: Flink
  Issue Type: Bug
  Components: Autoscaler, Kubernetes Operator
Affects Versions: kubernetes-operator-1.7.0
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


We moved all autoscaler configuration from 
{{kubernetes.operator.job.autoscaler.*}} to {{job.autoscaler.*}}. 

With the latest release, the logs are full with logs like this:

{noformat}
level:  WARN 
logger:  org.apache.flink.configuration.Configuration 
message:  Config uses deprecated configuration key 
'kubernetes.operator.job.autoscaler.target.utilization' instead of proper key 
'job.autoscaler.target.utilization' 
{noformat}

The reason is that the configuration is loaded for every reconciliation.

This configuration is already widely adopted across hundreds of pipelines. I 
propose to remove the deprecation from the config keys and make them "fallback" 
keys instead which removes the deprecation warning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33710) Autoscaler redeploys pipeline for a NOOP parallelism change

2023-12-06 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793800#comment-17793800
 ] 

Maximilian Michels commented on FLINK-33710:


Additional fix via ca1d8472d1a1e817268950dae079592581fa5b8f to prevent any 
existing deployments to get affected.

> Autoscaler redeploys pipeline for a NOOP parallelism change
> ---
>
> Key: FLINK-33710
> URL: https://issues.apache.org/jira/browse/FLINK-33710
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0, kubernetes-operator-1.7.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The operator supports two modes to apply autoscaler changes:
> # Use the internal Flink config {{pipeline.jobvertex-parallelism-overrides}} 
> # Make use of Flink's Rescale API 
> For (1), a string has to be generated for the Flink config with the actual 
> overrides. This string has to be deterministic for a given map. But it is not.
> Consider the following observed log:
> {noformat}
>   >>> Event  | Info| SPECCHANGED | SCALE change(s) detected (Diff: 
> FlinkDeploymentSpec[flinkConfiguration.pipeline.jobvertex-parallelism-overrides
>  : 
> 92542d1280187bd464274368a5f86977:3,9f979ed859083299d29f281832cb5be0:1,84881d7bda0dc3d44026e37403420039:1,1652184ffd0522859c7840a24936847c:1
>  -> 
> 9f979ed859083299d29f281832cb5be0:1,84881d7bda0dc3d44026e37403420039:1,92542d1280187bd464274368a5f86977:3,1652184ffd0522859c7840a24936847c:1]),
>  starting reconciliation. 
> {noformat}
> The overrides are identical but the order is different which triggers a 
> redeploy. This does not seem to happen often but some deterministic string 
> generation (e.g. sorting by key) is required to prevent any NOOP updates.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33522) Savepoint upgrade mode fails despite the savepoint succeeding

2023-12-01 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-33522:
---
Fix Version/s: kubernetes-operator-1.8.0
   (was: kubernetes-operator-1.7.0)

> Savepoint upgrade mode fails despite the savepoint succeeding
> -
>
> Key: FLINK-33522
> URL: https://issues.apache.org/jira/browse/FLINK-33522
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0, kubernetes-operator-1.6.1
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Under certain circumstances, savepoint creation can succeed but the job fails 
> afterwards. One example is when there are messages being distributed by the 
> source coordinator to finished tasks. This is possibly a Flink bug although 
> it's not clear yet how to solve the issue.
> After the savepoint succeeded Flink fails the job like this:
> {noformat}
> Source (1/2) 
> (cd4d56ddb71c0e763cc400bcfe2fd8ac_4081cf0163fcce7fe6af0cf07ad2d43c_0_0) 
> switched from RUNNING to FAILED on host-taskmanager-1-1 @ ip(dataPort=36519). 
> {noformat}
> {noformat}
> An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering 
> task failover to ensure consistency. Event: 'AddSplitEvents[[[B@722a23fa]]', 
> targetTask: Source (1/2) - execution #0
> Caused by:
> org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task 
> is not running, but in state FINISHED
>at 
> org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1502)
>at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask
> {noformat}
> Inside the operator this is processed as:
> {noformat}
> java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException:
>  A savepoint has been created at: s3://..., but the corresponding job 
> 1b1a3061194c62ded6e2fe823b61b2ea failed during stopping. The savepoint is 
> consistent, but might have uncommitted transactions. If you want to commit 
> the transaction please restart a job from this savepoint. 
>   
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
>   
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) 
>   
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.cancelJob(AbstractFlinkService.java:319)
>  
>   
> org.apache.flink.kubernetes.operator.service.NativeFlinkService.cancelJob(NativeFlinkService.java:121)
>  
>   
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.cancelJob(ApplicationReconciler.java:223)
>  
>   
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:122)
>  
>  
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:163)
>   
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:136)
>  
>   
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
>  
>   
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
>  
>   
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)
>  
>   
> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
>  
>   
> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)
>  
>   
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
>  
>   
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
>  
>   
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
>  
>   
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
>  
>   
> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)
>  
>   
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  
>   
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  
>   

[jira] [Commented] (FLINK-33522) Savepoint upgrade mode fails despite the savepoint succeeding

2023-12-01 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791975#comment-17791975
 ] 

Maximilian Michels commented on FLINK-33522:


Additional fix required via 51a91049b5f17f8a0b21e11feceb4410a97c50c1.

> Savepoint upgrade mode fails despite the savepoint succeeding
> -
>
> Key: FLINK-33522
> URL: https://issues.apache.org/jira/browse/FLINK-33522
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0, kubernetes-operator-1.6.1
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Under certain circumstances, savepoint creation can succeed but the job fails 
> afterwards. One example is when there are messages being distributed by the 
> source coordinator to finished tasks. This is possibly a Flink bug although 
> it's not clear yet how to solve the issue.
> After the savepoint succeeded Flink fails the job like this:
> {noformat}
> Source (1/2) 
> (cd4d56ddb71c0e763cc400bcfe2fd8ac_4081cf0163fcce7fe6af0cf07ad2d43c_0_0) 
> switched from RUNNING to FAILED on host-taskmanager-1-1 @ ip(dataPort=36519). 
> {noformat}
> {noformat}
> An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering 
> task failover to ensure consistency. Event: 'AddSplitEvents[[[B@722a23fa]]', 
> targetTask: Source (1/2) - execution #0
> Caused by:
> org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task 
> is not running, but in state FINISHED
>at 
> org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1502)
>at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask
> {noformat}
> Inside the operator this is processed as:
> {noformat}
> java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException:
>  A savepoint has been created at: s3://..., but the corresponding job 
> 1b1a3061194c62ded6e2fe823b61b2ea failed during stopping. The savepoint is 
> consistent, but might have uncommitted transactions. If you want to commit 
> the transaction please restart a job from this savepoint. 
>   
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
>   
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) 
>   
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.cancelJob(AbstractFlinkService.java:319)
>  
>   
> org.apache.flink.kubernetes.operator.service.NativeFlinkService.cancelJob(NativeFlinkService.java:121)
>  
>   
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.cancelJob(ApplicationReconciler.java:223)
>  
>   
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:122)
>  
>  
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:163)
>   
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:136)
>  
>   
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
>  
>   
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
>  
>   
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)
>  
>   
> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
>  
>   
> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)
>  
>   
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
>  
>   
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
>  
>   
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
>  
>   
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
>  
>   
> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)
>  
>   
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  
>   
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  
>

[jira] [Created] (FLINK-33710) Autoscaler redeploys pipeline for a NOOP parallelism change

2023-11-30 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-33710:
--

 Summary: Autoscaler redeploys pipeline for a NOOP parallelism 
change
 Key: FLINK-33710
 URL: https://issues.apache.org/jira/browse/FLINK-33710
 Project: Flink
  Issue Type: Bug
  Components: Autoscaler, Kubernetes Operator
Affects Versions: kubernetes-operator-1.7.0, kubernetes-operator-1.6.0
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


The operator supports two modes to apply autoscaler changes:

# Use the internal Flink config {{pipeline.jobvertex-parallelism-overrides}} 
# Make use of Flink's Rescale API 

For (1), a string has to be generated for the Flink config with the actual 
overrides. This string has to be deterministic for a given map. But it is not.

Consider the following observed log:

{noformat}
  >>> Event  | Info| SPECCHANGED | SCALE change(s) detected (Diff: 
FlinkDeploymentSpec[flinkConfiguration.pipeline.jobvertex-parallelism-overrides 
: 
92542d1280187bd464274368a5f86977:3,9f979ed859083299d29f281832cb5be0:1,84881d7bda0dc3d44026e37403420039:1,1652184ffd0522859c7840a24936847c:1
 -> 
9f979ed859083299d29f281832cb5be0:1,84881d7bda0dc3d44026e37403420039:1,92542d1280187bd464274368a5f86977:3,1652184ffd0522859c7840a24936847c:1]),
 starting reconciliation. 
{noformat}

The overrides are identical but the order is different which triggers a 
redeploy. This does not seem to happen often but some deterministic string 
generation (e.g. sorting by key) is required to prevent any NOOP updates.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-30593) Determine restart time on the fly for Autoscaler

2023-11-24 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-30593.

Fix Version/s: kubernetes-operator-1.8.0
   Resolution: Fixed

> Determine restart time on the fly for Autoscaler
> 
>
> Key: FLINK-30593
> URL: https://issues.apache.org/jira/browse/FLINK-30593
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Alexander Fedulov
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Currently the autoscaler uses a preconfigured restart time for the job. We 
> should dynamically adjust this on the observered restart times for scale 
> operations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33572) Minimize ConfigMap API operations for autoscaler state

2023-11-16 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels closed FLINK-33572.
--
Resolution: Fixed

Implemented via 02840b96ef3116ea95a440af4f945398900d89df and 
ab0ec081eac86619a22632616fa4c01c074ecd33.

> Minimize ConfigMap API operations for autoscaler state
> --
>
> Key: FLINK-33572
> URL: https://issues.apache.org/jira/browse/FLINK-33572
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
>
> The newly introduced flush operation after the refactoring the autoscaler 
> interfaces, optimizes the number of write operations to the underlying state 
> store. A couple of further optimizations:
> 1. Any writes should be deferred until flush is called.
> 2. The flush routine should detect whether a write is needed and writing if 
> there are no changes
> 3. Clearing state should only require one write operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33572) Minimize ConfigMap API operations for autoscaler state

2023-11-16 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-33572:
---
Release Note: Minimize ConfigMap operations for autoscaler state  (was: 
Implemented via 02840b96ef3116ea95a440af4f945398900d89df and 
ab0ec081eac86619a22632616fa4c01c074ecd33.)

> Minimize ConfigMap API operations for autoscaler state
> --
>
> Key: FLINK-33572
> URL: https://issues.apache.org/jira/browse/FLINK-33572
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
>
> The newly introduced flush operation after the refactoring the autoscaler 
> interfaces, optimizes the number of write operations to the underlying state 
> store. A couple of further optimizations:
> 1. Any writes should be deferred until flush is called.
> 2. The flush routine should detect whether a write is needed and writing if 
> there are no changes
> 3. Clearing state should only require one write operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-33572) Minimize ConfigMap API operations for autoscaler state

2023-11-16 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reopened FLINK-33572:


> Minimize ConfigMap API operations for autoscaler state
> --
>
> Key: FLINK-33572
> URL: https://issues.apache.org/jira/browse/FLINK-33572
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
>
> The newly introduced flush operation after the refactoring the autoscaler 
> interfaces, optimizes the number of write operations to the underlying state 
> store. A couple of further optimizations:
> 1. Any writes should be deferred until flush is called.
> 2. The flush routine should detect whether a write is needed and writing if 
> there are no changes
> 3. Clearing state should only require one write operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33572) Minimize ConfigMap API operations for autoscaler state

2023-11-16 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels closed FLINK-33572.
--
Release Note: Implemented via 02840b96ef3116ea95a440af4f945398900d89df and 
ab0ec081eac86619a22632616fa4c01c074ecd33.
  Resolution: Fixed

> Minimize ConfigMap API operations for autoscaler state
> --
>
> Key: FLINK-33572
> URL: https://issues.apache.org/jira/browse/FLINK-33572
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
>
> The newly introduced flush operation after the refactoring the autoscaler 
> interfaces, optimizes the number of write operations to the underlying state 
> store. A couple of further optimizations:
> 1. Any writes should be deferred until flush is called.
> 2. The flush routine should detect whether a write is needed and writing if 
> there are no changes
> 3. Clearing state should only require one write operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33572) Minimize ConfigMap API operations for autoscaler state

2023-11-16 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-33572:
--

 Summary: Minimize ConfigMap API operations for autoscaler state
 Key: FLINK-33572
 URL: https://issues.apache.org/jira/browse/FLINK-33572
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler, Kubernetes Operator
Affects Versions: kubernetes-operator-1.6.0
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.7.0


The newly introduced flush operation after the refactoring the autoscaler 
interfaces, optimizes the number of write operations to the underlying state 
store. A couple of further optimizations:

1. Any writes should be deferred until flush is called.
2. The flush routine should detect whether a write is needed and writing if 
there are no changes
3. Clearing state should only require one write operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32960) Logic to log vertex exclusion only once does not work correctly

2023-11-15 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-32960.

Resolution: Later

> Logic to log vertex exclusion only once does not work correctly
> ---
>
> Key: FLINK-32960
> URL: https://issues.apache.org/jira/browse/FLINK-32960
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: kubernetes-operator-1.8.0
>
>
> As part of daeb4b6559f0d26b1b0f23be5e8230f895b0a03e we wanted to log vertex 
> exclusion only once. This logic does not work because the vertices without 
> busy time are excluded in memory for every run. So we print "No 
> busyTimeMsPerSecond metric available for {}. No scaling will be performed for 
> this vertex." for every run.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32723) FLIP-334 : Decoupling autoscaler and kubernetes and support the Standalone Autoscaler

2023-11-11 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785183#comment-17785183
 ] 

Maximilian Michels commented on FLINK-32723:


Thanks for following through with this [~fanrui]! Great work.

> FLIP-334 : Decoupling autoscaler and kubernetes and support the Standalone 
> Autoscaler
> -
>
> Key: FLINK-32723
> URL: https://issues.apache.org/jira/browse/FLINK-32723
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Fix For: kubernetes-operator-1.7.0
>
>
> This is an umbrella jira for decoupling autoscaler and kubernetes.
> https://cwiki.apache.org/confluence/x/x4qzDw



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33522) Savepoint upgrade mode fails despite the savepoint succeeding

2023-11-10 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-33522:
---
Description: 
Under certain circumstances, savepoint creation can succeed but the job fails 
afterwards. One example is when there are messages being distributed by the 
source coordinator to finished tasks. This is possibly a Flink bug although 
it's not clear yet how to solve the issue.

After the savepoint succeeded Flink fails the job like this:
{noformat}
Source (1/2) 
(cd4d56ddb71c0e763cc400bcfe2fd8ac_4081cf0163fcce7fe6af0cf07ad2d43c_0_0) 
switched from RUNNING to FAILED on host-taskmanager-1-1 @ ip(dataPort=36519). 
{noformat}
{noformat}
An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering 
task failover to ensure consistency. Event: 'AddSplitEvents[[[B@722a23fa]]', 
targetTask: Source (1/2) - execution #0
Caused by:
org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task 
is not running, but in state FINISHED
   at 
org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1502)
   at org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask
{noformat}

Inside the operator this is processed as:

{noformat}
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException:
 A savepoint has been created at: s3://..., but the corresponding job 
1b1a3061194c62ded6e2fe823b61b2ea failed during stopping. The savepoint is 
consistent, but might have uncommitted transactions. If you want to commit the 
transaction please restart a job from this savepoint. 

  
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
  
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) 
  
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.cancelJob(AbstractFlinkService.java:319)
 
  
org.apache.flink.kubernetes.operator.service.NativeFlinkService.cancelJob(NativeFlinkService.java:121)
 
  
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.cancelJob(ApplicationReconciler.java:223)
 
  
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:122)
 
 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:163)
  
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:136)
 
  
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
 
  
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
 
  
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96) 
  
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
 
  
io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95) 
  
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
 
  
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
 
  
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
 
  
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
 
  
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)
 
  
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
  
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
  java.lang.Thread.run(Thread.java:829) 
{noformat}

Subsequently we get the following because HA metadata is not available anymore. 
It has been cleared up after the terminal job failure:

{noformat}
org.apache.flink.kubernetes.operator.exception.RecoveryFailureException","message":"HA
 metadata not available to restore from last state. It is possible that the job 
has finished or terminally failed, or the configmaps have been deleted. 
{noformat}

The deployment needs to be manually restored from a savepoint.

  was:
Under certain circumstances, savepoint creation can succeed but the job fails 
afterwards. One example is when there are messages being distributed by the 
source coordinator to finished tasks. This is possibly a Flink bug although 
it's not clear how to solve this issue.

After the savepoint succeeded Flink fails the job like this:
{noformat}
Source (1/2) 

[jira] [Created] (FLINK-33522) Savepoint upgrade mode fails despite the savepoint succeeding

2023-11-10 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-33522:
--

 Summary: Savepoint upgrade mode fails despite the savepoint 
succeeding
 Key: FLINK-33522
 URL: https://issues.apache.org/jira/browse/FLINK-33522
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.6.1, kubernetes-operator-1.6.0
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.7.0


Under certain circumstances, savepoint creation can succeed but the job fails 
afterwards. One example is when there are messages being distributed by the 
source coordinator to finished tasks. This is possibly a Flink bug although 
it's not clear how to solve this issue.

After the savepoint succeeded Flink fails the job like this:
{noformat}
Source (1/2) 
(cd4d56ddb71c0e763cc400bcfe2fd8ac_4081cf0163fcce7fe6af0cf07ad2d43c_0_0) 
switched from RUNNING to FAILED on host-taskmanager-1-1 @ ip(dataPort=36519). 
{noformat}
{noformat}
An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering 
task failover to ensure consistency. Event: 'AddSplitEvents[[[B@722a23fa]]', 
targetTask: Source (1/2) - execution #0
Caused by:
org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task 
is not running, but in state FINISHED
   at 
org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1502)
   at org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask
{noformat}

Inside the operator this is processed as:

{noformat}
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException:
 A savepoint has been created at: s3://..., but the corresponding job 
1b1a3061194c62ded6e2fe823b61b2ea failed during stopping. The savepoint is 
consistent, but might have uncommitted transactions. If you want to commit the 
transaction please restart a job from this savepoint. 

  
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
  
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) 
  
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.cancelJob(AbstractFlinkService.java:319)
 
  
org.apache.flink.kubernetes.operator.service.NativeFlinkService.cancelJob(NativeFlinkService.java:121)
 
  
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.cancelJob(ApplicationReconciler.java:223)
 
  
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:122)
 
 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:163)
  
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:136)
 
  
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
 
  
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
 
  
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96) 
  
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
 
  
io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95) 
  
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
 
  
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
 
  
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
 
  
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
 
  
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)
 
  
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
  
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
  java.lang.Thread.run(Thread.java:829) 
{noformat}

Subsequently we get the following because HA metadata is not available anymore. 
It has been cleared up after the terminal job failure:

{noformat}
org.apache.flink.kubernetes.operator.exception.RecoveryFailureException","message":"HA
 metadata not available to restore from last state. It is possible that the job 
has finished or terminally failed, or the configmaps have been deleted. 
{noformat}

The deployment needs to be manually restored from a savepoint.



--
This message was sent by 

[jira] [Commented] (FLINK-33429) Metric collection during stabilization phase may error due to missing metrics

2023-11-03 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17782643#comment-17782643
 ] 

Maximilian Michels commented on FLINK-33429:


Thanks for closing!

> Metric collection during stabilization phase may error due to missing metrics
> -
>
> Key: FLINK-33429
> URL: https://issues.apache.org/jira/browse/FLINK-33429
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
>
> The new code for the 1.7.0 release introduces metric collection during the 
> stabilization phase to allow sampling the observed true processing rate. 
> Metrics might not be fully initialized during that phase, as evident through 
> the error metrics. The following error is thrown: 
> {noformat}
> java.lang.RuntimeException: Could not find required metric 
> NUM_RECORDS_OUT_PER_SEC for 667f5d5aa757fb217b92c06f0f5d2bf2 
> {noformat}
> To prevent these errors shadowing actual errors, we should detect and ignore 
> this recoverable exception.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33429) Metric collection during stabilization phase may error due to missing metrics

2023-11-01 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-33429:
--

 Summary: Metric collection during stabilization phase may error 
due to missing metrics
 Key: FLINK-33429
 URL: https://issues.apache.org/jira/browse/FLINK-33429
 Project: Flink
  Issue Type: Bug
  Components: Autoscaler
Affects Versions: kubernetes-operator-1.7.0
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.7.0


The new code for the 1.7.0 release introduces metric collection during the 
stabilization phase to allow sampling the observed true processing rate. 
Metrics might not be fully initialized during that phase, as evident through 
the error metrics. The following error is thrown: 

{noformat}
java.lang.RuntimeException: Could not find required metric 
NUM_RECORDS_OUT_PER_SEC for 667f5d5aa757fb217b92c06f0f5d2bf2 
{noformat}

To prevent these errors shadowing actual errors, we should detect and ignore 
this recoverable exception.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32127) Source busy time is inaccurate in many cases

2023-10-31 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781296#comment-17781296
 ] 

Maximilian Michels commented on FLINK-32127:


A new scaling method has been merged to address the unreliable source busyness 
metric: FLINK-33306.

> Source busy time is inaccurate in many cases
> 
>
> Key: FLINK-32127
> URL: https://issues.apache.org/jira/browse/FLINK-32127
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Zhanghao Chen
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.7.0
>
>
> We found that source busy time is inaccurate in many cases. The reason is 
> that sources are usu. multi-threaded (Kafka and RocketMq for example), there 
> is a fetcher thread fetching data from data source, and a consumer thread 
> deserializes data with an blocking queue in between. A source is considered 
>  # *idle* if the consumer is blocked by fetching data from the queue
>  # *backpressured* if the consumer is blocked by writing data to downstream 
> operators
>  # *busy* otherwise
> However, this means that if the bottleneck is on the fetcher side, the 
> consumer will be often blocked by fetching data from the queue, the source 
> idle time would be high, but in fact it is busy and consumes a lot of CPU. In 
> some of our jobs, the source max busy time is only ~600 ms while it has 
> actually reached the limit.
> The bottleneck could be on the fetcher side, for example, when Kafka enables 
> zstd compression, uncompression on the consumer side could be quite heavy 
> compared to data deserialization on the consumer thread side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32991) Some metrics from autoscaler never get registered

2023-08-31 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-32991.

Resolution: Fixed

> Some metrics from autoscaler never get registered
> -
>
> Key: FLINK-32991
> URL: https://issues.apache.org/jira/browse/FLINK-32991
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler, Deployment / Kubernetes
>Affects Versions: kubernetes-operator-1.6.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
>
> Not all metrics appear in the latest 1.6 release. This is because we report 
> metrics as soon as they are available and the registration code assumes that 
> they will all be available at once. In practice, some are only available 
> after sampling data multiple times. For example, TARGET_DATA_RATE is only 
> available after the source metrics have been aggregated and the lag has been 
> computed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32992) Recommended parallelism metric is a duplicate of Parallelism metric

2023-08-31 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17760934#comment-17760934
 ] 

Maximilian Michels commented on FLINK-32992:


That makes sense. Closing.

> Recommended parallelism metric is a duplicate of Parallelism metric
> ---
>
> Key: FLINK-32992
> URL: https://issues.apache.org/jira/browse/FLINK-32992
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler, Deployment / Kubernetes
>Affects Versions: kubernetes-operator-1.6.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: kubernetes-operator-1.7.0
>
>
> The two metrics are the same. Recommended parallelism seems to have been 
> added as a way to report real-time parallelism updates before we changed all 
> metrics to be reported in real time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32992) Recommended parallelism metric is a duplicate of Parallelism metric

2023-08-31 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-32992.

Resolution: Not A Problem

> Recommended parallelism metric is a duplicate of Parallelism metric
> ---
>
> Key: FLINK-32992
> URL: https://issues.apache.org/jira/browse/FLINK-32992
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler, Deployment / Kubernetes
>Affects Versions: kubernetes-operator-1.6.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: kubernetes-operator-1.7.0
>
>
> The two metrics are the same. Recommended parallelism seems to have been 
> added as a way to report real-time parallelism updates before we changed all 
> metrics to be reported in real time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   10   >