[jira] [Created] (FLINK-35414) Cancel jobs through rest api for last-state upgrades

2024-05-21 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-35414:
--

 Summary: Cancel jobs through rest api for last-state upgrades
 Key: FLINK-35414
 URL: https://issues.apache.org/jira/browse/FLINK-35414
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora
Assignee: Gyula Fora


The kubernetes operator currently always deletes the JM deployment directly 
during last-state upgrades instead of attempting any type of graceful shutdown.

We could improve the last-state upgrade logic to cancel the job in cases where 
the JM is healthy and then simply extract the last checkpoint info through the 
rest api like we already do for terminal job states.

This would allow the last-state upgrade mode to work even for session jobs and 
this may even eliminate a few corner cases that can result from the current 
forceful upgrade mechanism. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35126) Improve checkpoint progress health check config and enable by default

2024-05-13 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-35126:
--

Assignee: Gyula Fora

> Improve checkpoint progress health check config and enable by default
> -
>
> Key: FLINK-35126
> URL: https://issues.apache.org/jira/browse/FLINK-35126
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>
> Currently the checkpoint progress health check window is configurable by 
> Duration. This makes it hard to enable by default as the sensible interval 
> depends on the checkpoint interval.
> We should rework the config and add an alternative checkpoint interval 
> multiplier based config which could be set by default to 3 (default window is 
> 3x checkpoint interval )
> If checkpointing is not enabled in config the health check would be disabled 
> of course.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35292) Set dummy savepoint path during last-state upgrade

2024-05-06 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-35292:
--

 Summary: Set dummy savepoint path during last-state upgrade
 Key: FLINK-35292
 URL: https://issues.apache.org/jira/browse/FLINK-35292
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora
Assignee: Gyula Fora


Currently the operator always sets the savepoint path even if last-state (HA 
metadata) must be used. 

This can be misleading to users as the set savepoint path normally should never 
take effect and can actually lead to incorrect state restored if the HA 
metadata is deleted by the user at the wrong moment. 

To avoid this we can set an explicit dummy savepoint path which will prevent 
restoring from it accidentally. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor

2024-05-03 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843254#comment-17843254
 ] 

Gyula Fora commented on FLINK-35285:


I think you make a good point here but we have to be a bit careful in terms of 
how much key group skew we allow while scaling up or down.

When we are scaling down to a parallelism which doesn't result in an even key 
distribution then our computed expected throughput will be off (because that is 
based on the assumption that throughput is linearly dependent on the 
parallelism but that assumes even key group distribution -> no data skew 
introduced by the scaling itself).

However this is something we can actually calculate by looking at how uneven 
the key group distribition is. As long as the introduced skew is within the 
flexible target rate boundaries then we should be able to scale down (without 
expecting a "rebound"). 

> Autoscaler key group optimization can interfere with scale-down.max-factor
> --
>
> Key: FLINK-35285
> URL: https://issues.apache.org/jira/browse/FLINK-35285
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Trystan
>Priority: Minor
>
> When setting a less aggressive scale down limit, the key group optimization 
> can prevent a vertex from scaling down at all. It will hunt from target 
> upwards to maxParallelism/2, and will always find currentParallelism again.
>  
> A simple test trying to scale down from a parallelism of 60 with a 
> scale-down.max-factor of 0.2:
> {code:java}
> assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, 
> 360)); {code}
>  
> It seems reasonable to make a good attempt to spread data across subtasks, 
> but not at the expense of total deadlock. The problem is that during scale 
> down it doesn't actually ensure that newParallelism will be < 
> currentParallelism. The only workaround is to set a scale down factor large 
> enough such that it finds the next lowest divisor of the maxParallelism.
>  
> Clunky, but something to ensure it can make at least some progress. There is 
> another test that now fails, but just to illustrate the point:
> {code:java}
> for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) 
> {
> if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p 
> > currentParallelism)) {
> if (maxParallelism % p == 0) {
> return p;
> }
> }
> } {code}
>  
> Perhaps this is by design and not a bug, but total failure to scale down in 
> order to keep optimized key groups does not seem ideal.
>  
> Key group optimization block:
> [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35279) Support "last-state" upgrade mode for FlinkSessionJob

2024-05-02 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-35279:
--

Assignee: Alan Zhang

> Support "last-state" upgrade mode for FlinkSessionJob 
> --
>
> Key: FLINK-35279
> URL: https://issues.apache.org/jira/browse/FLINK-35279
> Project: Flink
>  Issue Type: New Feature
>Reporter: Alan Zhang
>Assignee: Alan Zhang
>Priority: Major
>
> The "last-state" upgrade mode is only supported for Flink application mode 
> today[1], we should provide a consistent / similar user experience in Flink 
> session mode.
> [1] 
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades]
> {code:java}
> Last state upgrade mode is currently only supported for FlinkDeployments. 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35192) operator oom

2024-05-01 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842607#comment-17842607
 ] 

Gyula Fora commented on FLINK-35192:


Merged b1f4b3f35f907e1f6425ad2401dbf24309e6fb59 to main

> operator oom
> 
>
> Key: FLINK-35192
> URL: https://issues.apache.org/jira/browse/FLINK-35192
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: jdk: openjdk11
> operator version: 1.6.1
>Reporter: chenyuzhi
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-04-22-15-47-49-455.png, 
> image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, 
> image-2024-04-22-15-58-42-850.png, image-2024-04-30-16-47-07-289.png, 
> image-2024-04-30-17-11-24-974.png, image-2024-04-30-20-38-25-195.png, 
> image-2024-04-30-20-39-05-109.png, image-2024-04-30-20-39-34-396.png, 
> image-2024-04-30-20-41-51-660.png, image-2024-04-30-20-43-20-125.png, 
> screenshot-1.png, screenshot-2.png, screenshot-3.png
>
>
> The kubernetest operator docker process was killed by kernel cause out of 
> memory(the time is 2024.04.03: 18:16)
>  !image-2024-04-22-15-47-49-455.png! 
> Metrics:
> the pod memory (RSS) is increasing slowly in the past 7 days:
>  !screenshot-1.png! 
> However the jvm memory metrics of operator not shown obvious anomaly:
>  !image-2024-04-22-15-58-23-269.png! 
>  !image-2024-04-22-15-58-42-850.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35192) operator oom

2024-05-01 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-35192:
---
Fix Version/s: kubernetes-operator-1.9.0

> operator oom
> 
>
> Key: FLINK-35192
> URL: https://issues.apache.org/jira/browse/FLINK-35192
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: jdk: openjdk11
> operator version: 1.6.1
>Reporter: chenyuzhi
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
> Attachments: image-2024-04-22-15-47-49-455.png, 
> image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, 
> image-2024-04-22-15-58-42-850.png, image-2024-04-30-16-47-07-289.png, 
> image-2024-04-30-17-11-24-974.png, image-2024-04-30-20-38-25-195.png, 
> image-2024-04-30-20-39-05-109.png, image-2024-04-30-20-39-34-396.png, 
> image-2024-04-30-20-41-51-660.png, image-2024-04-30-20-43-20-125.png, 
> screenshot-1.png, screenshot-2.png, screenshot-3.png
>
>
> The kubernetest operator docker process was killed by kernel cause out of 
> memory(the time is 2024.04.03: 18:16)
>  !image-2024-04-22-15-47-49-455.png! 
> Metrics:
> the pod memory (RSS) is increasing slowly in the past 7 days:
>  !screenshot-1.png! 
> However the jvm memory metrics of operator not shown obvious anomaly:
>  !image-2024-04-22-15-58-23-269.png! 
>  !image-2024-04-22-15-58-42-850.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35192) Kubernetes operator oom

2024-05-01 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-35192:
---
Summary: Kubernetes operator oom  (was: operator oom)

> Kubernetes operator oom
> ---
>
> Key: FLINK-35192
> URL: https://issues.apache.org/jira/browse/FLINK-35192
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: jdk: openjdk11
> operator version: 1.6.1
>Reporter: chenyuzhi
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
> Attachments: image-2024-04-22-15-47-49-455.png, 
> image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, 
> image-2024-04-22-15-58-42-850.png, image-2024-04-30-16-47-07-289.png, 
> image-2024-04-30-17-11-24-974.png, image-2024-04-30-20-38-25-195.png, 
> image-2024-04-30-20-39-05-109.png, image-2024-04-30-20-39-34-396.png, 
> image-2024-04-30-20-41-51-660.png, image-2024-04-30-20-43-20-125.png, 
> screenshot-1.png, screenshot-2.png, screenshot-3.png
>
>
> The kubernetest operator docker process was killed by kernel cause out of 
> memory(the time is 2024.04.03: 18:16)
>  !image-2024-04-22-15-47-49-455.png! 
> Metrics:
> the pod memory (RSS) is increasing slowly in the past 7 days:
>  !screenshot-1.png! 
> However the jvm memory metrics of operator not shown obvious anomaly:
>  !image-2024-04-22-15-58-23-269.png! 
>  !image-2024-04-22-15-58-42-850.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35192) operator oom

2024-04-29 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842249#comment-17842249
 ] 

Gyula Fora commented on FLINK-35192:


That makes sense [~bgeng777] this would be a good improvement

> operator oom
> 
>
> Key: FLINK-35192
> URL: https://issues.apache.org/jira/browse/FLINK-35192
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: jdk: openjdk11
> operator version: 1.6.1
>Reporter: chenyuzhi
>Priority: Major
> Attachments: image-2024-04-22-15-47-49-455.png, 
> image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, 
> image-2024-04-22-15-58-42-850.png, screenshot-1.png, screenshot-2.png, 
> screenshot-3.png
>
>
> The kubernetest operator docker process was killed by kernel cause out of 
> memory(the time is 2024.04.03: 18:16)
>  !image-2024-04-22-15-47-49-455.png! 
> Metrics:
> the pod memory (RSS) is increasing slowly in the past 7 days:
>  !screenshot-1.png! 
> However the jvm memory metrics of operator not shown obvious anomaly:
>  !image-2024-04-22-15-58-23-269.png! 
>  !image-2024-04-22-15-58-42-850.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35157) Sources with watermark alignment get stuck once some subtasks finish

2024-04-24 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-35157:
--

Assignee: elon_X

> Sources with watermark alignment get stuck once some subtasks finish
> 
>
> Key: FLINK-35157
> URL: https://issues.apache.org/jira/browse/FLINK-35157
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.2, 1.19.0, 1.18.1
>Reporter: Gyula Fora
>Assignee: elon_X
>Priority: Critical
> Attachments: image-2024-04-24-21-36-16-146.png
>
>
> The current watermark alignment logic can easily get stuck if some subtasks 
> finish while others are still running.
> The reason is that once a source subtask finishes, the subtask is not 
> excluded from alignment, effectively blocking the rest of the job to make 
> progress beyond last wm + alignment time for the finished sources.
> This can be easily reproduced by the following simple pipeline:
> {noformat}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> DataStream s = env.fromSource(new NumberSequenceSource(0, 100),
> 
> WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner)
>  (aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10), 
> Duration.ofSeconds(2)),
> "Sequence Source").filter((FilterFunction) aLong -> {
> Thread.sleep(200);
> return true;
> }
> );
> s.print();
> env.execute();{noformat}
> The solution could be to send out a max watermark event once the sources 
> finish or to exclude them from the source coordinator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34574) Add CPU and memory size autoscaler quota

2024-04-19 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-34574.
--
Fix Version/s: kubernetes-operator-1.9.0
   Resolution: Fixed

merged to main baad90088ea5b5b240186a530a79b64fb84cc77e

> Add CPU and memory size autoscaler quota
> 
>
> Key: FLINK-34574
> URL: https://issues.apache.org/jira/browse/FLINK-34574
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler
>Reporter: Gabor Somogyi
>Assignee: Gabor Somogyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35157) Sources with watermark alignment get stuck once some subtasks finish

2024-04-18 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-35157:
--

 Summary: Sources with watermark alignment get stuck once some 
subtasks finish
 Key: FLINK-35157
 URL: https://issues.apache.org/jira/browse/FLINK-35157
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.18.1, 1.19.0, 1.17.2
Reporter: Gyula Fora


The current watermark alignment logic can easily get stuck if some subtasks 
finish while others are still running.

The reason is that once a source subtask finishes, the subtask is not excluded 
from alignment, effectively blocking the rest of the job to make progress 
beyond last wm + alignment time for the finished sources.

This can be easily reproduced by the following simple pipeline:
{noformat}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStream s = env.fromSource(new NumberSequenceSource(0, 100),

WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner)
 (aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10), 
Duration.ofSeconds(2)),
"Sequence Source").filter((FilterFunction) aLong -> {
Thread.sleep(200);
return true;
}
);

s.print();
env.execute();{noformat}
The solution could be to send out a max watermark event once the sources finish 
or to exclude them from the source coordinator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31860) FlinkDeployments never finalize when namespace is deleted

2024-04-18 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-31860.
--
Fix Version/s: kubernetes-operator-1.9.0
 Assignee: Zhou JIANG  (was: Jayme Howard)
   Resolution: Fixed

merged to main 4293d58329af562e9c50216c3005b4577a289b90

> FlinkDeployments never finalize when namespace is deleted
> -
>
> Key: FLINK-31860
> URL: https://issues.apache.org/jira/browse/FLINK-31860
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.1
> Environment: Apache Flink Kubernetes Operator 1.3.1
> Kubernetes 1.24.9
>Reporter: Jayme Howard
>Assignee: Zhou JIANG
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: kubernetes-operator-1.9.0
>
>
> This appears to be a pretty straightforward issue, but I don't know the 
> codebase well enough to propose a fix.  When a FlinkDeployment is present in 
> a namespace, and the namespace is deleted, the FlinkDeployment never 
> reconciles and fails to complete its finalizer.  This leads to the namespace 
> being blocked from deletion indefinitely, requiring manual manipulation to 
> remove the finalizer on the FlinkDeployment.
>  
> Namespace conditions:
> {code:java}
> conditions:
> - lastTransitionTime: '2023-04-18T22:17:48Z'
>   message: All resources successfully discovered
>   reason: ResourcesDiscovered
>   status: 'False'
>   type: NamespaceDeletionDiscoveryFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All legacy kube types successfully parsed
>   reason: ParsedGroupVersions
>   status: 'False'
>   type: NamespaceDeletionGroupVersionParsingFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All content successfully deleted, may be waiting on finalization
>   reason: ContentDeleted
>   status: 'False'
>   type: NamespaceDeletionContentFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some resources are remaining: flinkdeployments.flink.apache.org 
> has 2
> resource instances'
>   reason: SomeResourcesRemain
>   status: 'True'
>   type: NamespaceContentRemaining
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some content in the namespace has finalizers remaining: 
> flinkdeployments.flink.apache.org/finalizer
> in 2 resource instances'
>   reason: SomeFinalizersRemain
>   status: 'True'
>   type: NamespaceFinalizersRemaining
> phase: Terminating {code}
> FlinkDeployment example (some fields redacted):
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   creationTimestamp: '2023-03-23T18:27:02Z'
>   deletionGracePeriodSeconds: 0
>   deletionTimestamp: '2023-03-23T18:27:35Z'
>   finalizers:
>   - flinkdeployments.flink.apache.org/finalizer
>   generation: 3
>   name: 
>   namespace: 
>   resourceVersion: '10565277081'
>   uid: e50d2683-6c0c-467e-b10c-fe0f4e404692
> spec:
>   flinkConfiguration:
>     taskmanager.numberOfTaskSlots: '2'
>   flinkVersion: v1_16
>   image: 
>   job:
>     args: []
>     entryClass: 
>     jarURI: 
>     parallelism: 2
>     state: running
>     upgradeMode: stateless
>   jobManager:
>     replicas: 1
>     resource:
>       cpu: 1
>       memory: 2048m
>   logConfiguration:
>     log4j-console.properties: '# This affects logging for both user code and 
> Flink      rootLogger.level = INFO      rootLogger.appenderRef.console.ref = 
> ConsoleAppender      rootLogger.appenderRef.rolling.ref = RollingFileAppender 
>      # Uncomment this if you want to _only_ change Flink''s logging      
> #logger.flink.name = org.apache.flink      #logger.flink.level = INFO      # 
> The following lines keep the log level of common libraries/connectors on      
> # log level INFO. The root logger does not override this. You have to 
> manually      # change the log levels here.      logger.akka.name = akka      
> logger.akka.level = INFO      logger.kafka.name= org.apache.kafka      
> logger.kafka.level = INFO      logger.hadoop.name = org.apache.hadoop      
> logger.hadoop.level = INFO      logger.zookeeper.name = org.apache.zookeeper  
>     logger.zookeeper.level = INFO      # Log all infos to the console      
> appender.console.name = ConsoleAppender      appender.console.type = CONSOLE  
>     appender.console.layout.type = PatternLayout      
> appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
>       - %m%n      # Log all infos in the given rolling file      
> appender.rolling.name = RollingFileAppender      appender.rolling.type = 
> RollingFile      appender.rolling.append = false      
> appender.rolling.fileName = ${sys:log.file}      appender.rolling.filePattern 
> = ${sys:log.file}.%i      

[jira] [Created] (FLINK-35126) Improve checkpoint progress health check config and enable by default

2024-04-16 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-35126:
--

 Summary: Improve checkpoint progress health check config and 
enable by default
 Key: FLINK-35126
 URL: https://issues.apache.org/jira/browse/FLINK-35126
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora


Currently the checkpoint progress health check window is configurable by 
Duration. This makes it hard to enable by default as the sensible interval 
depends on the checkpoint interval.

We should rework the config and add an alternative checkpoint interval 
multiplier based config which could be set by default to 3 (default window is 
3x checkpoint interval )

If checkpointing is not enabled in config the health check would be disabled of 
course.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35123) Flink Kubernetes Operator should not do deleteHAData

2024-04-16 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837610#comment-17837610
 ] 

Gyula Fora commented on FLINK-35123:


I agree that if the rest api is accessible we could call shutdown and not touch 
the HA metadata. But there are some cases when you Need to delete HA metadata 
explicitly:
 - Cluster is not in a healthy state (rest api not available)
 - Job is previously suspended with last-state upgrade mode where HA metadata 
is left

Also in Kubernetes HA configuration which is much more common than ZK the HA 
metadata cleanup is much faster than anything else. It's a simple ConfigMap 
deletion.

> Flink Kubernetes Operator should not do deleteHAData 
> -
>
> Key: FLINK-35123
> URL: https://issues.apache.org/jira/browse/FLINK-35123
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0, kubernetes-operator-1.8.0
>Reporter: Fei Feng
>Priority: Major
> Attachments: image-2024-04-16-15-56-33-426.png
>
>
> we use flink HA based on zookeeper. when a lots of FlinkDeployment was 
> deleting, operator will be spend to many time in cleanHaData. the jstack show 
> that reconcile thread was hang on disconnect with zookeeper. this made 
> deleting flinkdeployment was slowly. 
> !image-2024-04-16-15-56-33-426.png|width=502,height=263!
>  
> I don't understand why flink kubernetes operator need cleanHAdata , as 
> [~aitozi] comment in PR  [FLINK-26336 Call cancel on deletion & clean up 
> configmaps as well 
> #28|https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815968841]
> {quote}it's a bit of out of scope of the operator responsibility or ability
> {quote}
> and I'm totally agree with his point. 
> and I want to know why we call don't call RestClusterClient#shutDownCluster 
> interface, which is
> 1. more graceful and reasonable (operator need not care whether flink app 
> enable ha or not) 2. compatible across flink versions .   
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35108) Deployment recovery is triggered on terminal jobs after jm shutdown ttl

2024-04-15 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-35108.
--
Fix Version/s: kubernetes-operator-1.9.0
   Resolution: Fixed

merged to main be3b79b64440065c9ae9d3eb0267e412d81ef67e

> Deployment recovery is triggered on terminal jobs after jm shutdown ttl
> ---
>
> Key: FLINK-35108
> URL: https://issues.apache.org/jira/browse/FLINK-35108
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0, kubernetes-operator-1.8.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Critical
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
>
> The deployment recovery mechanism is incorrectly triggered for terminal jobs 
> once the JM deployment is deleted after the TTL period. 
> This causes jobs to be resubmitted. This affects only batch jobs.
> The workaround is to set 
> kubernetes.operator.jm-deployment-recovery.enabled: false
>  for batch jobs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35108) Deployment recovery is triggered on terminal jobs after jm shutdown ttl

2024-04-15 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-35108:
--

 Summary: Deployment recovery is triggered on terminal jobs after 
jm shutdown ttl
 Key: FLINK-35108
 URL: https://issues.apache.org/jira/browse/FLINK-35108
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.8.0, kubernetes-operator-1.7.0
Reporter: Gyula Fora
Assignee: Gyula Fora


The deployment recovery mechanism is incorrectly triggered for terminal jobs 
once the JM deployment is deleted after the TTL period. 

This causes jobs to be resubmitted. This affects only batch jobs.

The workaround is to set 
kubernetes.operator.jm-deployment-recovery.enabled: false

 for batch jobs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full

2024-04-11 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836106#comment-17836106
 ] 

Gyula Fora edited comment on FLINK-34704 at 4/11/24 10:41 AM:
--

I agree with [~pnowojski] here, the currently blocked element would be lost in 
the checkpoint. But [~Zakelly] also has a valid point.

I have played around with this and there is a simple optimisation to be made 
for the async operator though under certain circumstances.

If the AWOP is the head of the operator chain (no upstream), we could actually 
checkpoint during yielding but we would also need to checkpoint the current 
processed element as part of the buffer (temporarily increase the size of the 
buffer by 1).

This is still related to the other ticket in the sense that we need to get the 
checkpoint trigger during yield but it needs a custom logic for the AWOP to 
allow checkpointing while being blocked on the full buffer


was (Author: gyfora):
I agree with [~pnowojski] here, the currently blocked element would be lost in 
the checkpoint.

I have played around with this and there is a simple optimisation to be made 
for the async operator though under certain circumstances.

If the AWOP is the head of the operator chain (no upstream), we could actually 
checkpoint during yielding but we would also need to checkpoint the current 
processed element as part of the buffer (temporarily increase the size of the 
buffer by 1).

This is still related to the other ticket in the sense that we need to get the 
checkpoint trigger during yield but it needs a custom logic for the AWOP to 
allow checkpointing while being blocked on the full buffer

> Process checkpoint barrier in AsyncWaitOperator when the element queue is full
> --
>
> Key: FLINK-34704
> URL: https://issues.apache.org/jira/browse/FLINK-34704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Zakelly Lan
>Priority: Minor
>
> As discussed in 
> https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it 
> is better to provide such a new `yield` that can process mail with low 
> priority in the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full

2024-04-11 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836107#comment-17836107
 ] 

Gyula Fora commented on FLINK-34704:


So restricting the optimisation to the head of the operator chain is somewhat 
restricting but still the improvement in this particular scenario is actually 
huge and this may make or break some specialised use-cases so probably still 
worth considering after FLINK-35051

> Process checkpoint barrier in AsyncWaitOperator when the element queue is full
> --
>
> Key: FLINK-34704
> URL: https://issues.apache.org/jira/browse/FLINK-34704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Zakelly Lan
>Priority: Minor
>
> As discussed in 
> https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it 
> is better to provide such a new `yield` that can process mail with low 
> priority in the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full

2024-04-11 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836106#comment-17836106
 ] 

Gyula Fora commented on FLINK-34704:


I agree with [~pnowojski] here, the currently blocked element would be lost in 
the checkpoint.

I have played around with this and there is a simple optimisation to be made 
for the async operator though under certain circumstances.

If the AWOP is the head of the operator chain (no upstream), we could actually 
checkpoint during yielding but we would also need to checkpoint the current 
processed element as part of the buffer (temporarily increase the size of the 
buffer by 1).

This is still related to the other ticket in the sense that we need to get the 
checkpoint trigger during yield but it needs a custom logic for the AWOP to 
allow checkpointing while being blocked on the full buffer

> Process checkpoint barrier in AsyncWaitOperator when the element queue is full
> --
>
> Key: FLINK-34704
> URL: https://issues.apache.org/jira/browse/FLINK-34704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Zakelly Lan
>Priority: Minor
>
> As discussed in 
> https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it 
> is better to provide such a new `yield` that can process mail with low 
> priority in the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34947) Reduce JM scale down timeout

2024-03-27 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-34947:
---
Description: 
We introduced a logic to scale down the JobManager before the task managers are 
killed to have a more graceful shutdown sequence.

Currently this is always done in native mode, but it does not make sense during 
orphan deletion propagation when the intention is to delete the FlinkDeployment 
but not the underlying resources.

Furthermore instead of using the entire deletion timeout we should reduce this 
to only use a portion of it as it's an optional step and we should always have 
keep enough time for the TM shutdown.

  was:
We introduced a logic to scale down the JobManager before the task managers are 
killed to have a more graceful shutdown sequence.

Instead of using the entire deletion timeout we should reduce this to only use 
a portion of it as it's an optional step and we should always have keep enough 
time for the TM shutdown.


> Reduce JM scale down timeout
> 
>
> Key: FLINK-34947
> URL: https://issues.apache.org/jira/browse/FLINK-34947
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.8.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
> Fix For: kubernetes-operator-1.9.0
>
>
> We introduced a logic to scale down the JobManager before the task managers 
> are killed to have a more graceful shutdown sequence.
> Currently this is always done in native mode, but it does not make sense 
> during orphan deletion propagation when the intention is to delete the 
> FlinkDeployment but not the underlying resources.
> Furthermore instead of using the entire deletion timeout we should reduce 
> this to only use a portion of it as it's an optional step and we should 
> always have keep enough time for the TM shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34947) Reduce JM scale down timeout

2024-03-27 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-34947:
---
Description: 
We introduced a logic to scale down the JobManager before the task managers are 
killed to have a more graceful shutdown sequence.

Instead of using the entire deletion timeout we should reduce this to only use 
a portion of it as it's an optional step and we should always have keep enough 
time for the TM shutdown.

  was:
We introduced a logic to scale down the JobManager before the task managers are 
killed to have a more graceful shutdown sequence.

Currently this is always done in native mode, but it does not make sense during 
orphan deletion propagation when the intention is to delete the FlinkDeployment 
but not the underlying resources.

Furthermore instead of using the entire deletion timeout we should reduce this 
to only use a portion of it as it's an optional step and we should always have 
keep enough time for the TM shutdown.


> Reduce JM scale down timeout
> 
>
> Key: FLINK-34947
> URL: https://issues.apache.org/jira/browse/FLINK-34947
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.8.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
> Fix For: kubernetes-operator-1.9.0
>
>
> We introduced a logic to scale down the JobManager before the task managers 
> are killed to have a more graceful shutdown sequence.
> Instead of using the entire deletion timeout we should reduce this to only 
> use a portion of it as it's an optional step and we should always have keep 
> enough time for the TM shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34947) Reduce JM scale down timeout

2024-03-27 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-34947:
---
Summary: Reduce JM scale down timeout  (was: Do not scale down JM in Orphan 
deletion propagation and reduce timeout)

> Reduce JM scale down timeout
> 
>
> Key: FLINK-34947
> URL: https://issues.apache.org/jira/browse/FLINK-34947
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.8.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
> Fix For: kubernetes-operator-1.9.0
>
>
> We introduced a logic to scale down the JobManager before the task managers 
> are killed to have a more graceful shutdown sequence.
> Currently this is always done in native mode, but it does not make sense 
> during orphan deletion propagation when the intention is to delete the 
> FlinkDeployment but not the underlying resources.
> Furthermore instead of using the entire deletion timeout we should reduce 
> this to only use a portion of it as it's an optional step and we should 
> always have keep enough time for the TM shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34947) Do not scale down JM in Orphan deletion propagation and reduce timeout

2024-03-27 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-34947:
--

 Summary: Do not scale down JM in Orphan deletion propagation and 
reduce timeout
 Key: FLINK-34947
 URL: https://issues.apache.org/jira/browse/FLINK-34947
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.8.0
Reporter: Gyula Fora
Assignee: Gyula Fora
 Fix For: kubernetes-operator-1.9.0


We introduced a logic to scale down the JobManager before the task managers are 
killed to have a more graceful shutdown sequence.

Currently this is always done in native mode, but it does not make sense during 
orphan deletion propagation when the intention is to delete the FlinkDeployment 
but not the underlying resources.

Furthermore instead of using the entire deletion timeout we should reduce this 
to only use a portion of it as it's an optional step and we should always have 
keep enough time for the TM shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32529) Optional startup probe for JM deployment

2024-03-26 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830805#comment-17830805
 ] 

Gyula Fora commented on FLINK-32529:


[~tbnguyen1407] please open a separate Jira ticket for this. If you could also 
work on a fix that would be a nice improvement :) 

> Optional startup probe for JM deployment
> 
>
> Key: FLINK-32529
> URL: https://issues.apache.org/jira/browse/FLINK-32529
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.6.0
>
>
> There are certain cases where the JM enters a startup crash loop for example 
> due to incorrect HA config setup. With the current operator logic these cases 
> require manual user intervention as we don't have HA metadata available for 
> the last checkpoint and it also seems like the JM actually started already.
> To solve this properly we suggest adding a default JM startup probe that 
> queries the rest api (/config) endpoint. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34927) Translate flink-kubernetes-operator documentation

2024-03-25 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830364#comment-17830364
 ] 

Gyula Fora commented on FLINK-34927:


I think this would be great, I won't be able to review the content though :D 

> Translate flink-kubernetes-operator documentation
> -
>
> Key: FLINK-34927
> URL: https://issues.apache.org/jira/browse/FLINK-34927
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Caican Cai
>Priority: Major
> Fix For: kubernetes-operator-1.9.0
>
>
> Currently, the flink-kubernetes-operator documentation is only in English. I 
> hope it can be translated into Chinese so that more Chinese users can use it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34907) jobRunningTs should be the timestamp that all tasks are running

2024-03-21 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829467#comment-17829467
 ] 

Gyula Fora commented on FLINK-34907:


similar to the other Jirayou opened this only seems to affect the standalone 
autoscaler logic

> jobRunningTs should be the timestamp that all tasks are running
> ---
>
> Key: FLINK-34907
> URL: https://issues.apache.org/jira/browse/FLINK-34907
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Fix For: kubernetes-operator-1.9.0
>
>
> Currently, we consider the timestamp that JobStatus is changed to RUNNING as 
> jobRunningTs. But the JobStatus will be RUNNING once job starts schedule, so 
> it doesn't mean all tasks are running. 
> It will let the isStabilizing or estimating restart time are not accurate.
> Solution: jobRunningTs should be the timestamp that all tasks are running.
> It can be got from SubtasksTimesHeaders rest api.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34228) Add long UTF serializer/deserializer

2024-03-19 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-34228.
--
Fix Version/s: 1.20.0
   Resolution: Fixed

merged to master f75935245799471ddf025d2bab0d0d212e79088e

> Add long UTF serializer/deserializer
> 
>
> Key: FLINK-34228
> URL: https://issues.apache.org/jira/browse/FLINK-34228
> Project: Flink
>  Issue Type: Improvement
>Reporter: Peter Vary
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> DataOutputSerializer.writeUTF has a hard limit on the length of the string 
> (64k). This is inherited from the DataOutput.writeUTF method, where the JDK 
> specifically defines this limit [1].
> For our use-case we need to enable the possibility to serialize longer UTF 
> strings, so we will need to define a writeLongUTF method with a similar 
> specification than the writeUTF, but without the length limit.
> Based on the discussion on the mailing list, this is a good additional 
> serialization utility to Flink [2]
> [1] - 
> https://docs.oracle.com/javase/8/docs/api/java/io/DataOutput.html#writeUTF-java.lang.String-
> [2] - https://lists.apache.org/thread/ocm6cj0h8o3wbwo7fz2l1b4odss750rk



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34728) operator does not need to upload and download the jar when deploying session job

2024-03-19 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828222#comment-17828222
 ] 

Gyula Fora commented on FLINK-34728:


That makes sense, however this is more a ticket for Flink core / web submission 
at this point. A similar improvement could be made for supporting job 
submissions without jars (from the cluster classpath). A FLIP would be 
necessary for making these changes for the rest job submission 

> operator does not need to upload and download the jar when deploying session 
> job
> 
>
> Key: FLINK-34728
> URL: https://issues.apache.org/jira/browse/FLINK-34728
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0, kubernetes-operator-1.6.0, 
> kubernetes-operator-1.7.0
>Reporter: Fei Feng
>Priority: Major
> Attachments: image-2024-03-19-15-59-20-933.png
>
>
> Problem:
> By reading the source code of the sessionjob's first reconcilition in the 
> session mode of the flink kubernetes operator, a clear single point of 
> bottleneck can be identified. When submitting a session job, the operator 
> needs to first download the job jar from the jarURL to the local storage of 
> kubernetes pod , then upload the jar to the job manager through the 
> `/jars/upload` rest api, and finally call the `/jars/:jarid/run` rest api to 
> launch the job.
> In this process, the operator needs to first download the jar and then upload 
> the jar. When multiple jobs are submitted to the session cluster 
> simultaneously, the operator can become a single point of bottleneck, which 
> may be limited by the network traffic or other resource constraints of the 
> operator pod.
>  
> [https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L824]
> !image-2024-03-19-15-59-20-933.png|width=548,height=432!
>  
> Solution:
> We can modify the job submission process in the session mode. The jobmanager 
> can provide a `/jars/run` rest api that supports self-downloading the job 
> jar, and the operator only needs to send a rest request to submit the job, 
> without download and upload the job jar. In this way, the submission pressure 
> of the operator can be distributed to each job manager. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34726) Flink Kubernetes Operator has some room for optimizing performance.

2024-03-19 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828210#comment-17828210
 ] 

Gyula Fora commented on FLINK-34726:


Thanks for the detailed analysis [~Fei Feng] . You are completely right that we 
don't optimise the rest client usage and that may add a significant overhead. 
We have done similar optimisation in the past for config access/generation by 
using the FlinkResourceContext class. 

We could probably move the rest client generation logic there instead of hiding 
it under the FlinkService completely. This will be however a bigger change as 
it will affect the methods of the FlinkService interface as well.

Sounds a bit strange that getSecondaryResource is so expensive as that should 
happen from a cache. We should look into it while it's expensive in the first 
place because passing the FlinkDeployment objects around will make the code a 
bit more complicated, but I guess that could also be hidden under the 
FlinkSessionJobContext

> Flink Kubernetes Operator has some room for optimizing performance.
> ---
>
> Key: FLINK-34726
> URL: https://issues.apache.org/jira/browse/FLINK-34726
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0, kubernetes-operator-1.6.0, 
> kubernetes-operator-1.7.0
>Reporter: Fei Feng
>Priority: Major
> Attachments: operator_no_submit_no_kill.flamegraph.html
>
>
> When there is a huge number of FlinkDeployment and FlinkSessionJob in a 
> kubernetes cluster, there will be a significant delay between event submit 
> into reconcile thread pool and  event is processed. 
> this is our test:we give operator enough resource(cpu: 10core, memory: 20g, 
> reconcile thread pool  size was 200 ) and we deployed 1 jobs firstly (one 
> FlinkDeployment and one SessionJob per job) , then we do submit/delete job 
> tests. we found that 
> 1. it cost about 2min between create new FlinkDeployment and FlinkSessionJob 
> CR to k8s and the flink job submited to jobmanager.
> 2. it cost about 1min between delete a FlinkDeployment and FlinkSessionJob CR 
>  and the flink job and session cluster cleared.
>  
> I use async-profiler to get flamegraph when  there is a huge number 
> FlinkDeployment and FlinkSessionJob. I found two obvious areas for 
> optimization
> 1. For Flinkdeployment: in the observe step, we call 
> AbstractFlinkService.getClusterInfo/listJobs/getTaskManagerInfo , every time 
> we call these method we need create RestClusterClient/ send requests/ close, 
> I think we should reuse RestClusterClient as much as possible to avoid 
> frequently creating objects to reduce GC pressure
> 2. For FlinkSessionJob (This issue is more obvious): in the whole reconcile 
> loop, we call getSecondaryResource 5 times to get FlinkDeployement resource 
> info. Based on my current understanding of the Flink Operator, I think we do 
> not need to call it 5 times in a single reconcile loop, calling it once is 
> enough. If yes, we cloud save 30% cpu usage (every getSecondaryResource cost 
> 6% cpu usage)
> [^operator_no_submit_no_kill.flamegraph.html]
> I hope we can discuss solutions to address this problem together. I'm very 
> willing to optimize and resolve this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34655) Autoscaler doesn't work for flink 1.15

2024-03-13 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17826054#comment-17826054
 ] 

Gyula Fora commented on FLINK-34655:


[~mxm] I would be hesitant to try to backport these changes to 1.15/1.16, the 
community doesn't generally backport new features to older releases and also 
these are already out of the supported version scope of Flink core anyways. 

For 1.15 we would have to backport the aggregated metrics changes which is not 
backward compatible with the current 1.15 rest api, so not possible to do.

> Autoscaler doesn't work for flink 1.15
> --
>
> Key: FLINK-34655
> URL: https://issues.apache.org/jira/browse/FLINK-34655
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> flink-ubernetes-operator is committed to supporting the latest 4 flink minor 
> versions, and autoscaler is a part of flink-ubernetes-operator. Currently,  
> the latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18.
> But autoscaler doesn't work for  flink 1.15.
> h2. Root cause: 
> * FLINK-28310 added some properties in IOMetricsInfo in flink-1.16
> * IOMetricsInfo is a part of JobDetailsInfo
> * JobDetailsInfo is necessary for autoscaler [1]
> * flink's RestClient doesn't allow miss any property during deserializing the 
> json
> That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for 
> 1.15 jobs.
> h2. How to fix it properly?
> - [[FLINK-34655](https://issues.apache.org/jira/browse/FLINK-34655)] Copy 
> IOMetricsInfo to flink-autoscaler-standalone module
> - Removing them after 1.15 are not supported
> [1] 
> https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34655) Autoscaler doesn't work for flink 1.15

2024-03-12 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825657#comment-17825657
 ] 

Gyula Fora edited comment on FLINK-34655 at 3/12/24 12:12 PM:
--

Also this issue is fixed in the Kubernetes-operator package where we have an 
override version of IoMetricsInfo


was (Author: gyfora):
Also this issue is fixed in the Kubernetes-operator package where we have an 
override version of JobDetailsInfo

> Autoscaler doesn't work for flink 1.15
> --
>
> Key: FLINK-34655
> URL: https://issues.apache.org/jira/browse/FLINK-34655
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> flink-ubernetes-operator is committed to supporting the latest 4 flink minor 
> versions, and autoscaler is a part of flink-ubernetes-operator. Currently,  
> the latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18.
> But autoscaler doesn't work for  flink 1.15.
> h2. Root cause: 
> * FLINK-28310 added some properties in IOMetricsInfo in flink-1.16
> * IOMetricsInfo is a part of JobDetailsInfo
> * JobDetailsInfo is necessary for autoscaler [1]
> * flink's RestClient doesn't allow miss any property during deserializing the 
> json
> That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for 
> 1.15 jobs.
> h2. How to fix it properly?
> Flink side support ignore unknown properties.
> FLINK-33268 already do it. But I try run autoscaler with flink-1.15 job, it 
> still doesn't work. Because the IOMetricsInfo added some properties, they are 
> primitive type.
> It should disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES as well. 
> (Not sure whether it should be a seperate FLIP or it can be a part of 
> FLIP-401 [2].)
> h2. How to fix it in the short term?
> 1. Copy the latest RestMapperUtils and RestClient from master branch (It 
> includes FLINK-33268) to flink-autoscaler module. (The copied class will be 
> loaded first)
> 2. Disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper in copied class.
> Based on these 2 steps, flink-1.15 works well with autoscaler. (I try it 
> locally).
> After DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper is disabled, and the corresponding code 
> is released in flink side. flink-ubernetes-operator can remove these 2 copied 
> classes.
> [1] 
> https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34655) Autoscaler doesn't work for flink 1.15

2024-03-12 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825657#comment-17825657
 ] 

Gyula Fora commented on FLINK-34655:


Also this issue is fixed in the Kubernetes-operator package where we have an 
override version of JobDetailsInfo

> Autoscaler doesn't work for flink 1.15
> --
>
> Key: FLINK-34655
> URL: https://issues.apache.org/jira/browse/FLINK-34655
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> flink-ubernetes-operator is committed to supporting the latest 4 flink minor 
> versions, and autoscaler is a part of flink-ubernetes-operator. Currently,  
> the latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18.
> But autoscaler doesn't work for  flink 1.15.
> h2. Root cause: 
> * FLINK-28310 added some properties in IOMetricsInfo in flink-1.16
> * IOMetricsInfo is a part of JobDetailsInfo
> * JobDetailsInfo is necessary for autoscaler [1]
> * flink's RestClient doesn't allow miss any property during deserializing the 
> json
> That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for 
> 1.15 jobs.
> h2. How to fix it properly?
> Flink side support ignore unknown properties.
> FLINK-33268 already do it. But I try run autoscaler with flink-1.15 job, it 
> still doesn't work. Because the IOMetricsInfo added some properties, they are 
> primitive type.
> It should disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES as well. 
> (Not sure whether it should be a seperate FLIP or it can be a part of 
> FLIP-401 [2].)
> h2. How to fix it in the short term?
> 1. Copy the latest RestMapperUtils and RestClient from master branch (It 
> includes FLINK-33268) to flink-autoscaler module. (The copied class will be 
> loaded first)
> 2. Disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper in copied class.
> Based on these 2 steps, flink-1.15 works well with autoscaler. (I try it 
> locally).
> After DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper is disabled, and the corresponding code 
> is released in flink side. flink-ubernetes-operator can remove these 2 copied 
> classes.
> [1] 
> https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34655) Autoscaler doesn't work for flink 1.15

2024-03-12 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-34655:
---
Priority: Major  (was: Blocker)

> Autoscaler doesn't work for flink 1.15
> --
>
> Key: FLINK-34655
> URL: https://issues.apache.org/jira/browse/FLINK-34655
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> flink-ubernetes-operator is committed to supporting the latest 4 flink minor 
> versions, and autoscaler is a part of flink-ubernetes-operator. Currently,  
> the latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18.
> But autoscaler doesn't work for  flink 1.15.
> h2. Root cause: 
> * FLINK-28310 added some properties in IOMetricsInfo in flink-1.16
> * IOMetricsInfo is a part of JobDetailsInfo
> * JobDetailsInfo is necessary for autoscaler [1]
> * flink's RestClient doesn't allow miss any property during deserializing the 
> json
> That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for 
> 1.15 jobs.
> h2. How to fix it properly?
> Flink side support ignore unknown properties.
> FLINK-33268 already do it. But I try run autoscaler with flink-1.15 job, it 
> still doesn't work. Because the IOMetricsInfo added some properties, they are 
> primitive type.
> It should disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES as well. 
> (Not sure whether it should be a seperate FLIP or it can be a part of 
> FLIP-401 [2].)
> h2. How to fix it in the short term?
> 1. Copy the latest RestMapperUtils and RestClient from master branch (It 
> includes FLINK-33268) to flink-autoscaler module. (The copied class will be 
> loaded first)
> 2. Disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper in copied class.
> Based on these 2 steps, flink-1.15 works well with autoscaler. (I try it 
> locally).
> After DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper is disabled, and the corresponding code 
> is released in flink side. flink-ubernetes-operator can remove these 2 copied 
> classes.
> [1] 
> https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34655) Autoscaler doesn't work for flink 1.15

2024-03-12 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825656#comment-17825656
 ] 

Gyula Fora commented on FLINK-34655:


But the vertex parallelism overrides feature was introduced in 1.17 so the 
autoscaler never really officially supported anything before that. What do you 
think [~mxm] ?

> Autoscaler doesn't work for flink 1.15
> --
>
> Key: FLINK-34655
> URL: https://issues.apache.org/jira/browse/FLINK-34655
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> flink-ubernetes-operator is committed to supporting the latest 4 flink minor 
> versions, and autoscaler is a part of flink-ubernetes-operator. Currently,  
> the latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18.
> But autoscaler doesn't work for  flink 1.15.
> h2. Root cause: 
> * FLINK-28310 added some properties in IOMetricsInfo in flink-1.16
> * IOMetricsInfo is a part of JobDetailsInfo
> * JobDetailsInfo is necessary for autoscaler [1]
> * flink's RestClient doesn't allow miss any property during deserializing the 
> json
> That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for 
> 1.15 jobs.
> h2. How to fix it properly?
> Flink side support ignore unknown properties.
> FLINK-33268 already do it. But I try run autoscaler with flink-1.15 job, it 
> still doesn't work. Because the IOMetricsInfo added some properties, they are 
> primitive type.
> It should disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES as well. 
> (Not sure whether it should be a seperate FLIP or it can be a part of 
> FLIP-401 [2].)
> h2. How to fix it in the short term?
> 1. Copy the latest RestMapperUtils and RestClient from master branch (It 
> includes FLINK-33268) to flink-autoscaler module. (The copied class will be 
> loaded first)
> 2. Disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper in copied class.
> Based on these 2 steps, flink-1.15 works well with autoscaler. (I try it 
> locally).
> After DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper is disabled, and the corresponding code 
> is released in flink side. flink-ubernetes-operator can remove these 2 copied 
> classes.
> [1] 
> https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34655) Autoscaler doesn't work for flink 1.15

2024-03-12 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825655#comment-17825655
 ] 

Gyula Fora commented on FLINK-34655:


The bigger issue is that aggregated busy time metrics are not part of Flink 1.15

> Autoscaler doesn't work for flink 1.15
> --
>
> Key: FLINK-34655
> URL: https://issues.apache.org/jira/browse/FLINK-34655
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> flink-ubernetes-operator is committed to supporting the latest 4 flink minor 
> versions, and autoscaler is a part of flink-ubernetes-operator. Currently,  
> the latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18.
> But autoscaler doesn't work for  flink 1.15.
> h2. Root cause: 
> * FLINK-28310 added some properties in IOMetricsInfo in flink-1.16
> * IOMetricsInfo is a part of JobDetailsInfo
> * JobDetailsInfo is necessary for autoscaler [1]
> * flink's RestClient doesn't allow miss any property during deserializing the 
> json
> That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for 
> 1.15 jobs.
> h2. How to fix it properly?
> Flink side support ignore unknown properties.
> FLINK-33268 already do it. But I try run autoscaler with flink-1.15 job, it 
> still doesn't work. Because the IOMetricsInfo added some properties, they are 
> primitive type.
> It should disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES as well. 
> (Not sure whether it should be a seperate FLIP or it can be a part of 
> FLIP-401 [2].)
> h2. How to fix it in the short term?
> 1. Copy the latest RestMapperUtils and RestClient from master branch (It 
> includes FLINK-33268) to flink-autoscaler module. (The copied class will be 
> loaded first)
> 2. Disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper in copied class.
> Based on these 2 steps, flink-1.15 works well with autoscaler. (I try it 
> locally).
> After DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in 
> RestMapperUtils#flexibleObjectMapper is disabled, and the corresponding code 
> is released in flink side. flink-ubernetes-operator can remove these 2 copied 
> classes.
> [1] 
> https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34524) Scale down JobManager deployment to 0 before deletion

2024-03-11 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-34524.
--
Fix Version/s: kubernetes-operator-1.8.0
   Resolution: Fixed

merged to main ede1a610b3375d31a2e82287eec67ace70c4c8df

> Scale down JobManager deployment to 0 before deletion
> -
>
> Key: FLINK-34524
> URL: https://issues.apache.org/jira/browse/FLINK-34524
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> We recently improved the JM deployment deletion mechanism, however it seems 
> like task manager pod deletion can get stuck sometimes for a couple of 
> minutes in native mode if we simply try to delete everything at once.
> It speeds up the process and leads to cleaner shutdown if we scale down the 
> JM deployment to 0 (shutting down the JM pods first) and then perform the 
> deletion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31860) FlinkDeployments never finalize when namespace is deleted

2024-03-11 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825414#comment-17825414
 ] 

Gyula Fora commented on FLINK-31860:


I don’t really know how that would be possible but I welcome any 
recommendation/ solution .

> FlinkDeployments never finalize when namespace is deleted
> -
>
> Key: FLINK-31860
> URL: https://issues.apache.org/jira/browse/FLINK-31860
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.1
> Environment: Apache Flink Kubernetes Operator 1.3.1
> Kubernetes 1.24.9
>Reporter: Jayme Howard
>Assignee: Jayme Howard
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> This appears to be a pretty straightforward issue, but I don't know the 
> codebase well enough to propose a fix.  When a FlinkDeployment is present in 
> a namespace, and the namespace is deleted, the FlinkDeployment never 
> reconciles and fails to complete its finalizer.  This leads to the namespace 
> being blocked from deletion indefinitely, requiring manual manipulation to 
> remove the finalizer on the FlinkDeployment.
>  
> Namespace conditions:
> {code:java}
> conditions:
> - lastTransitionTime: '2023-04-18T22:17:48Z'
>   message: All resources successfully discovered
>   reason: ResourcesDiscovered
>   status: 'False'
>   type: NamespaceDeletionDiscoveryFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All legacy kube types successfully parsed
>   reason: ParsedGroupVersions
>   status: 'False'
>   type: NamespaceDeletionGroupVersionParsingFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All content successfully deleted, may be waiting on finalization
>   reason: ContentDeleted
>   status: 'False'
>   type: NamespaceDeletionContentFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some resources are remaining: flinkdeployments.flink.apache.org 
> has 2
> resource instances'
>   reason: SomeResourcesRemain
>   status: 'True'
>   type: NamespaceContentRemaining
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some content in the namespace has finalizers remaining: 
> flinkdeployments.flink.apache.org/finalizer
> in 2 resource instances'
>   reason: SomeFinalizersRemain
>   status: 'True'
>   type: NamespaceFinalizersRemaining
> phase: Terminating {code}
> FlinkDeployment example (some fields redacted):
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   creationTimestamp: '2023-03-23T18:27:02Z'
>   deletionGracePeriodSeconds: 0
>   deletionTimestamp: '2023-03-23T18:27:35Z'
>   finalizers:
>   - flinkdeployments.flink.apache.org/finalizer
>   generation: 3
>   name: 
>   namespace: 
>   resourceVersion: '10565277081'
>   uid: e50d2683-6c0c-467e-b10c-fe0f4e404692
> spec:
>   flinkConfiguration:
>     taskmanager.numberOfTaskSlots: '2'
>   flinkVersion: v1_16
>   image: 
>   job:
>     args: []
>     entryClass: 
>     jarURI: 
>     parallelism: 2
>     state: running
>     upgradeMode: stateless
>   jobManager:
>     replicas: 1
>     resource:
>       cpu: 1
>       memory: 2048m
>   logConfiguration:
>     log4j-console.properties: '# This affects logging for both user code and 
> Flink      rootLogger.level = INFO      rootLogger.appenderRef.console.ref = 
> ConsoleAppender      rootLogger.appenderRef.rolling.ref = RollingFileAppender 
>      # Uncomment this if you want to _only_ change Flink''s logging      
> #logger.flink.name = org.apache.flink      #logger.flink.level = INFO      # 
> The following lines keep the log level of common libraries/connectors on      
> # log level INFO. The root logger does not override this. You have to 
> manually      # change the log levels here.      logger.akka.name = akka      
> logger.akka.level = INFO      logger.kafka.name= org.apache.kafka      
> logger.kafka.level = INFO      logger.hadoop.name = org.apache.hadoop      
> logger.hadoop.level = INFO      logger.zookeeper.name = org.apache.zookeeper  
>     logger.zookeeper.level = INFO      # Log all infos to the console      
> appender.console.name = ConsoleAppender      appender.console.type = CONSOLE  
>     appender.console.layout.type = PatternLayout      
> appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
>       - %m%n      # Log all infos in the given rolling file      
> appender.rolling.name = RollingFileAppender      appender.rolling.type = 
> RollingFile      appender.rolling.append = false      
> appender.rolling.fileName = ${sys:log.file}      appender.rolling.filePattern 
> = ${sys:log.file}.%i      appender.rolling.layout.type = PatternLayout      
> appender.rolling.layout.pattern = %d{-MM-dd 

[jira] [Commented] (FLINK-34563) Autoscaling decision improvement

2024-03-11 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825380#comment-17825380
 ] 

Gyula Fora commented on FLINK-34563:


Copying over my comment from GitHub for completeness:



I have some concerns about this change:
 # It doesn't work with custom slot sharing configuration which is very common
 # It provides almost no benefit with large taskmanager sizes / low number of 
task slots.
 # It goes against some basic design philosophy in the autoscaler such that we 
do not scale vertices beyond their target capacity. It ties to 
[@mxm|https://github.com/mxm] 's question why the logic wouldn't apply to all 
vertices?

Taking that one step further why don't we scale all vertices to the same 
parallelism at that point? That would naturally cause more resource usage and 
less throughput. By the same logic I don't think we should scale even the 
largest ones further.

> Autoscaling decision improvement
> 
>
> Key: FLINK-34563
> URL: https://issues.apache.org/jira/browse/FLINK-34563
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Yang LI
>Priority: Minor
>  Labels: pull-request-available
>
> Hi, I'd like to propose a minor improvement based on my autoscaling 
> experiments. The concept revolves around identifying the vertex with the 
> highest level of parallelism and matching it to the maximum parallelism 
> supported by our task manager.
> The primary goal of this enhancement is to prevent any task slots from 
> remaining unused after the Flink autoscaler performs a rescaling operation. 
> I've already tested this modification in a custom build of the operator, 
> excluding the memory tuning feature. However, I believe it could be 
> beneficial, especially in scenarios where the memory tuning feature is not 
> enabled.
> And I have prepared this small pr also :)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31860) FlinkDeployments never finalize when namespace is deleted

2024-03-11 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825379#comment-17825379
 ] 

Gyula Fora commented on FLINK-31860:


I am not aware of any solution from the kubernetes / josdk side [~klam-shop] 

> FlinkDeployments never finalize when namespace is deleted
> -
>
> Key: FLINK-31860
> URL: https://issues.apache.org/jira/browse/FLINK-31860
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.1
> Environment: Apache Flink Kubernetes Operator 1.3.1
> Kubernetes 1.24.9
>Reporter: Jayme Howard
>Assignee: Jayme Howard
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> This appears to be a pretty straightforward issue, but I don't know the 
> codebase well enough to propose a fix.  When a FlinkDeployment is present in 
> a namespace, and the namespace is deleted, the FlinkDeployment never 
> reconciles and fails to complete its finalizer.  This leads to the namespace 
> being blocked from deletion indefinitely, requiring manual manipulation to 
> remove the finalizer on the FlinkDeployment.
>  
> Namespace conditions:
> {code:java}
> conditions:
> - lastTransitionTime: '2023-04-18T22:17:48Z'
>   message: All resources successfully discovered
>   reason: ResourcesDiscovered
>   status: 'False'
>   type: NamespaceDeletionDiscoveryFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All legacy kube types successfully parsed
>   reason: ParsedGroupVersions
>   status: 'False'
>   type: NamespaceDeletionGroupVersionParsingFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All content successfully deleted, may be waiting on finalization
>   reason: ContentDeleted
>   status: 'False'
>   type: NamespaceDeletionContentFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some resources are remaining: flinkdeployments.flink.apache.org 
> has 2
> resource instances'
>   reason: SomeResourcesRemain
>   status: 'True'
>   type: NamespaceContentRemaining
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some content in the namespace has finalizers remaining: 
> flinkdeployments.flink.apache.org/finalizer
> in 2 resource instances'
>   reason: SomeFinalizersRemain
>   status: 'True'
>   type: NamespaceFinalizersRemaining
> phase: Terminating {code}
> FlinkDeployment example (some fields redacted):
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   creationTimestamp: '2023-03-23T18:27:02Z'
>   deletionGracePeriodSeconds: 0
>   deletionTimestamp: '2023-03-23T18:27:35Z'
>   finalizers:
>   - flinkdeployments.flink.apache.org/finalizer
>   generation: 3
>   name: 
>   namespace: 
>   resourceVersion: '10565277081'
>   uid: e50d2683-6c0c-467e-b10c-fe0f4e404692
> spec:
>   flinkConfiguration:
>     taskmanager.numberOfTaskSlots: '2'
>   flinkVersion: v1_16
>   image: 
>   job:
>     args: []
>     entryClass: 
>     jarURI: 
>     parallelism: 2
>     state: running
>     upgradeMode: stateless
>   jobManager:
>     replicas: 1
>     resource:
>       cpu: 1
>       memory: 2048m
>   logConfiguration:
>     log4j-console.properties: '# This affects logging for both user code and 
> Flink      rootLogger.level = INFO      rootLogger.appenderRef.console.ref = 
> ConsoleAppender      rootLogger.appenderRef.rolling.ref = RollingFileAppender 
>      # Uncomment this if you want to _only_ change Flink''s logging      
> #logger.flink.name = org.apache.flink      #logger.flink.level = INFO      # 
> The following lines keep the log level of common libraries/connectors on      
> # log level INFO. The root logger does not override this. You have to 
> manually      # change the log levels here.      logger.akka.name = akka      
> logger.akka.level = INFO      logger.kafka.name= org.apache.kafka      
> logger.kafka.level = INFO      logger.hadoop.name = org.apache.hadoop      
> logger.hadoop.level = INFO      logger.zookeeper.name = org.apache.zookeeper  
>     logger.zookeeper.level = INFO      # Log all infos to the console      
> appender.console.name = ConsoleAppender      appender.console.type = CONSOLE  
>     appender.console.layout.type = PatternLayout      
> appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
>       - %m%n      # Log all infos in the given rolling file      
> appender.rolling.name = RollingFileAppender      appender.rolling.type = 
> RollingFile      appender.rolling.append = false      
> appender.rolling.fileName = ${sys:log.file}      appender.rolling.filePattern 
> = ${sys:log.file}.%i      appender.rolling.layout.type = PatternLayout      
> appender.rolling.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p 

[jira] [Updated] (FLINK-31860) FlinkDeployments never finalize when namespace is deleted

2024-03-11 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-31860:
---
Priority: Major  (was: Blocker)

> FlinkDeployments never finalize when namespace is deleted
> -
>
> Key: FLINK-31860
> URL: https://issues.apache.org/jira/browse/FLINK-31860
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.1
> Environment: Apache Flink Kubernetes Operator 1.3.1
> Kubernetes 1.24.9
>Reporter: Jayme Howard
>Assignee: Jayme Howard
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> This appears to be a pretty straightforward issue, but I don't know the 
> codebase well enough to propose a fix.  When a FlinkDeployment is present in 
> a namespace, and the namespace is deleted, the FlinkDeployment never 
> reconciles and fails to complete its finalizer.  This leads to the namespace 
> being blocked from deletion indefinitely, requiring manual manipulation to 
> remove the finalizer on the FlinkDeployment.
>  
> Namespace conditions:
> {code:java}
> conditions:
> - lastTransitionTime: '2023-04-18T22:17:48Z'
>   message: All resources successfully discovered
>   reason: ResourcesDiscovered
>   status: 'False'
>   type: NamespaceDeletionDiscoveryFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All legacy kube types successfully parsed
>   reason: ParsedGroupVersions
>   status: 'False'
>   type: NamespaceDeletionGroupVersionParsingFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All content successfully deleted, may be waiting on finalization
>   reason: ContentDeleted
>   status: 'False'
>   type: NamespaceDeletionContentFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some resources are remaining: flinkdeployments.flink.apache.org 
> has 2
> resource instances'
>   reason: SomeResourcesRemain
>   status: 'True'
>   type: NamespaceContentRemaining
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some content in the namespace has finalizers remaining: 
> flinkdeployments.flink.apache.org/finalizer
> in 2 resource instances'
>   reason: SomeFinalizersRemain
>   status: 'True'
>   type: NamespaceFinalizersRemaining
> phase: Terminating {code}
> FlinkDeployment example (some fields redacted):
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   creationTimestamp: '2023-03-23T18:27:02Z'
>   deletionGracePeriodSeconds: 0
>   deletionTimestamp: '2023-03-23T18:27:35Z'
>   finalizers:
>   - flinkdeployments.flink.apache.org/finalizer
>   generation: 3
>   name: 
>   namespace: 
>   resourceVersion: '10565277081'
>   uid: e50d2683-6c0c-467e-b10c-fe0f4e404692
> spec:
>   flinkConfiguration:
>     taskmanager.numberOfTaskSlots: '2'
>   flinkVersion: v1_16
>   image: 
>   job:
>     args: []
>     entryClass: 
>     jarURI: 
>     parallelism: 2
>     state: running
>     upgradeMode: stateless
>   jobManager:
>     replicas: 1
>     resource:
>       cpu: 1
>       memory: 2048m
>   logConfiguration:
>     log4j-console.properties: '# This affects logging for both user code and 
> Flink      rootLogger.level = INFO      rootLogger.appenderRef.console.ref = 
> ConsoleAppender      rootLogger.appenderRef.rolling.ref = RollingFileAppender 
>      # Uncomment this if you want to _only_ change Flink''s logging      
> #logger.flink.name = org.apache.flink      #logger.flink.level = INFO      # 
> The following lines keep the log level of common libraries/connectors on      
> # log level INFO. The root logger does not override this. You have to 
> manually      # change the log levels here.      logger.akka.name = akka      
> logger.akka.level = INFO      logger.kafka.name= org.apache.kafka      
> logger.kafka.level = INFO      logger.hadoop.name = org.apache.hadoop      
> logger.hadoop.level = INFO      logger.zookeeper.name = org.apache.zookeeper  
>     logger.zookeeper.level = INFO      # Log all infos to the console      
> appender.console.name = ConsoleAppender      appender.console.type = CONSOLE  
>     appender.console.layout.type = PatternLayout      
> appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
>       - %m%n      # Log all infos in the given rolling file      
> appender.rolling.name = RollingFileAppender      appender.rolling.type = 
> RollingFile      appender.rolling.append = false      
> appender.rolling.fileName = ${sys:log.file}      appender.rolling.filePattern 
> = ${sys:log.file}.%i      appender.rolling.layout.type = PatternLayout      
> appender.rolling.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
>       - %m%n      appender.rolling.policies.type = Policies      
> 

[jira] [Closed] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work

2024-03-08 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-34566.
--
Resolution: Fixed

merged to main 726e484c6a9b4121563829bc094b3eebeb8ddcf3

> Flink Kubernetes Operator reconciliation parallelism setting not work
> -
>
> Key: FLINK-34566
> URL: https://issues.apache.org/jira/browse/FLINK-34566
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Fei Feng
>Assignee: Fei Feng
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2024-03-04-10-58-37-679.png, 
> image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png
>
>
> After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , 
> we can not enlarge reconciliation parallelism , and the maximum 
> reconciliation parallelism was only 10. This results FlinkDeployment and 
> SessionJob 's reconciliation delay about 10-30 seconds when we have a large 
> scale flink session cluster and session jobs in k8s cluster。
>  
> After investigating and validating, I found the reason is the logic for 
> reconciliation thread pool creation in JOSDK has changed significantly 
> between this two version. 
> v4.3.0: 
> reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize 
> was same as corePoolSize), so we pass the reconciliation thread and get a 
> thread pool that matches our expectations.
> !image-2024-03-04-10-58-37-679.png|width=497,height=91!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198]
>  
> but in v4.2.0:
> the reconciliation thread pool was created as a customer executor which we 
> can pass corePoolSize and maximumPoolSize to create this thread pool.The 
> problem is that we only set the maximumPoolSize of the thread pool, while, 
> the corePoolSize of the thread pool is defaulted to 10. This causes thread 
> pool size was only 10 and majority of events would be placed in the workQueue 
> for a while.  
> !image-2024-03-04-11-17-22-877.png|width=569,height=112!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37]
>  
> the solution is also simple, we can create and pass thread pool in flink 
> kubernetes operator so that we can control the reconciliation thread pool 
> directly, such as:
> !image-2024-03-04-11-31-44-451.png|width=483,height=98!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work

2024-03-08 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-34566:
---
Fix Version/s: kubernetes-operator-1.8.0

> Flink Kubernetes Operator reconciliation parallelism setting not work
> -
>
> Key: FLINK-34566
> URL: https://issues.apache.org/jira/browse/FLINK-34566
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Fei Feng
>Assignee: Fei Feng
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
> Attachments: image-2024-03-04-10-58-37-679.png, 
> image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png
>
>
> After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , 
> we can not enlarge reconciliation parallelism , and the maximum 
> reconciliation parallelism was only 10. This results FlinkDeployment and 
> SessionJob 's reconciliation delay about 10-30 seconds when we have a large 
> scale flink session cluster and session jobs in k8s cluster。
>  
> After investigating and validating, I found the reason is the logic for 
> reconciliation thread pool creation in JOSDK has changed significantly 
> between this two version. 
> v4.3.0: 
> reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize 
> was same as corePoolSize), so we pass the reconciliation thread and get a 
> thread pool that matches our expectations.
> !image-2024-03-04-10-58-37-679.png|width=497,height=91!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198]
>  
> but in v4.2.0:
> the reconciliation thread pool was created as a customer executor which we 
> can pass corePoolSize and maximumPoolSize to create this thread pool.The 
> problem is that we only set the maximumPoolSize of the thread pool, while, 
> the corePoolSize of the thread pool is defaulted to 10. This causes thread 
> pool size was only 10 and majority of events would be placed in the workQueue 
> for a while.  
> !image-2024-03-04-11-17-22-877.png|width=569,height=112!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37]
>  
> the solution is also simple, we can create and pass thread pool in flink 
> kubernetes operator so that we can control the reconciliation thread pool 
> directly, such as:
> !image-2024-03-04-11-31-44-451.png|width=483,height=98!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34580) Job run via REST erases "pipeline.classpaths" config

2024-03-07 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-34580.
--
Resolution: Fixed

merged to main d0ce5349fdf1a611518eba20a169c475ee0b46c5

> Job run via REST erases "pipeline.classpaths" config
> 
>
> Key: FLINK-34580
> URL: https://issues.apache.org/jira/browse/FLINK-34580
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.17.2, 1.19.0, 1.18.1
>Reporter: Ferenc Csaky
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> The 
> [{{JarHandlerContext#applyToConfiguration}}|https://github.com/apache/flink/blob/e0b6c121eaf7aeb2974a45d199e452b022f07d29/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java#L134]
>  creates a {{PackagedProgram}} and then overwrites the {{pipeline.jars}} and 
> {{pipeline.classpaths}} values according to that newly created 
> {{{}PackagedProgram{}}}.
> Although that [{{PackagedProgram}} 
> init|https://github.com/apache/flink/blob/e0b6c121eaf7aeb2974a45d199e452b022f07d29/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java#L185]
>  does not set {{classpaths}} at all, so it will always overwrites the 
> effective configuration with an empty value, even if it had something 
> previously.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34580) Job run via REST erases "pipeline.classpaths" config

2024-03-07 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-34580:
--

Assignee: Ferenc Csaky

> Job run via REST erases "pipeline.classpaths" config
> 
>
> Key: FLINK-34580
> URL: https://issues.apache.org/jira/browse/FLINK-34580
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.17.2, 1.19.0, 1.18.1
>Reporter: Ferenc Csaky
>Assignee: Ferenc Csaky
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> The 
> [{{JarHandlerContext#applyToConfiguration}}|https://github.com/apache/flink/blob/e0b6c121eaf7aeb2974a45d199e452b022f07d29/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java#L134]
>  creates a {{PackagedProgram}} and then overwrites the {{pipeline.jars}} and 
> {{pipeline.classpaths}} values according to that newly created 
> {{{}PackagedProgram{}}}.
> Although that [{{PackagedProgram}} 
> init|https://github.com/apache/flink/blob/e0b6c121eaf7aeb2974a45d199e452b022f07d29/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java#L185]
>  does not set {{classpaths}} at all, so it will always overwrites the 
> effective configuration with an empty value, even if it had something 
> previously.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34619) Do not wait for scaling completion in UPGRADE state with in-place scaling

2024-03-07 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-34619:
--

 Summary: Do not wait for scaling completion in UPGRADE state with 
in-place scaling
 Key: FLINK-34619
 URL: https://issues.apache.org/jira/browse/FLINK-34619
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.7.0
Reporter: Gyula Fora
Assignee: Gyula Fora
 Fix For: kubernetes-operator-1.8.0


The operator currently puts the resource into upgrading state after triggering 
in-place scaling and keeps observing until the desired parallelism is reached 
before moving to deployed / stable. 

However this means that due to how the adaptive scheduler works this 
parallelism may never be reached and this is expected.

We should simplify the logic to consider scaling "done" once the resource 
requirements have been set correctly and then leave the rest to the adaptive 
scheduler



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34576) Flink deployment keep staying at RECONCILING/STABLE status

2024-03-07 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824297#comment-17824297
 ] 

Gyula Fora commented on FLINK-34576:


Ah I see, I thought the issue you linked was a fix for this specific problem in 
the JOSDK but It's not.

I think it would be weird if we should set a leader fence in the callback, why 
doesn't the JOSDK leader election mechanism already does this? If this is a 
problem with that, the issue should be fixed there right?

> Flink deployment keep staying at RECONCILING/STABLE status
> --
>
> Key: FLINK-34576
> URL: https://issues.apache.org/jira/browse/FLINK-34576
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
>Reporter: chenyuzhi
>Priority: Major
> Attachments: image-2024-03-05-15-13-11-032.png
>
>
> The HA mode of flink-kubernetes-operator is being used. When one of the pods 
> of flink-kubernetes-operator restarts, flink-kubernetes-operator switches the 
> leader. However, some flinkdeployments have been in the 
> *JOB_STATUS=RECONCILING_STATE=STABLE* state for a long time.
> Through the cmd "kubectl describe flinkdeployment xxx", can see the following 
> error, but there are no exceptions in the flink-kubernetes-operator log.
>  
> {code:java}
> Status:
>   Cluster Info:
>     Flink - Revision:             b6d20ed @ 2023-12-20T10:01:39+01:00
>     Flink - Version:              1.14.0-GDC1.6.0
>     Total - Cpu:                  7.0
>     Total - Memory:               30064771072
>   Error:                          
> {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
>  java.lang.RuntimeException: Failed to load 
> configuration","additionalMetadata":{},"throwableList":[{"type":"org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException","message":"java.lang.RuntimeException:
>  Failed to load 
> configuration","additionalMetadata":{}},{"type":"java.lang.RuntimeException","message":"Failed
>  to load configuration","additionalMetadata":{}}]}
>   Job Manager Deployment Status:  READY
>   Job Status:
>     Job Id:    cf44b5e73a1f263dd7d9f2c82be5216d
>     Job Name:  noah_stream_studio_1754211682_2218100380
>     Savepoint Info:
>       Last Periodic Savepoint Timestamp:  0
>       Savepoint History:
>     Start Time:     1705635107137
>     State:          RECONCILING
>     Update Time:    1709272530741
>   Lifecycle State:  STABLE {code}
>  
> !image-2024-03-05-15-13-11-032.png!
>  
> version:
> flink-kubernetes-operator: 1.6.1
> flink: 1.14.0/1.15.2 (flinkdeployment 1200+)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34588) FineGrainedSlotManager checks whether resources need to reconcile but doesn't act on the result

2024-03-06 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824082#comment-17824082
 ] 

Gyula Fora commented on FLINK-34588:


The links in the description don't seem to work :/ 

> FineGrainedSlotManager checks whether resources need to reconcile but doesn't 
> act on the result
> ---
>
> Key: FLINK-34588
> URL: https://issues.apache.org/jira/browse/FLINK-34588
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0, 1.18.1, 1.20.0
>Reporter: Matthias Pohl
>Priority: Major
>
> There are a few locations in {{FineGrainedSlotManager}} where we check 
> whether resources can/need to be reconciled but don't care about the result 
> and just trigger the resource update (e.g. in 
> [FineGrainedSlotManager:620|https://github.com/apache/flink/blob/c0d3e495f4c2316a80f251de77b05b943b5be1f8/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java#L620]
>  and 
> [FineGrainedSlotManager:676|https://github.com/apache/flink/blob/c0d3e495f4c2316a80f251de77b05b943b5be1f8/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java#L676]).
>  Looks like we could reduce the calls to the backend here.
> It's not having a major impact because this feature is only used in the 
> {{ActiveResourceManager}} which triggers 
> [checkResourceDeclarations|https://github.com/apache/flink/blob/c678244a3890273145a786b9e1bf1a4f96f6dcfd/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java#L331]
>  and reevaluates the {{resourceDeclarations}}. Not sure whether I missed 
> something here and there's actually a bigger issue with it. But considering 
> that nobody complained about it in the past, I'd assume that it's not a 
> severe issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34576) Flink deployment keep staying at RECONCILING/STABLE status

2024-03-06 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824041#comment-17824041
 ] 

Gyula Fora commented on FLINK-34576:


I am happy to review your PR if you try to bump the version

> Flink deployment keep staying at RECONCILING/STABLE status
> --
>
> Key: FLINK-34576
> URL: https://issues.apache.org/jira/browse/FLINK-34576
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
>Reporter: chenyuzhi
>Priority: Major
> Attachments: image-2024-03-05-15-13-11-032.png
>
>
> The HA mode of flink-kubernetes-operator is being used. When one of the pods 
> of flink-kubernetes-operator restarts, flink-kubernetes-operator switches the 
> leader. However, some flinkdeployments have been in the 
> *JOB_STATUS=RECONCILING_STATE=STABLE* state for a long time.
> Through the cmd "kubectl describe flinkdeployment xxx", can see the following 
> error, but there are no exceptions in the flink-kubernetes-operator log.
>  
> {code:java}
> Status:
>   Cluster Info:
>     Flink - Revision:             b6d20ed @ 2023-12-20T10:01:39+01:00
>     Flink - Version:              1.14.0-GDC1.6.0
>     Total - Cpu:                  7.0
>     Total - Memory:               30064771072
>   Error:                          
> {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
>  java.lang.RuntimeException: Failed to load 
> configuration","additionalMetadata":{},"throwableList":[{"type":"org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException","message":"java.lang.RuntimeException:
>  Failed to load 
> configuration","additionalMetadata":{}},{"type":"java.lang.RuntimeException","message":"Failed
>  to load configuration","additionalMetadata":{}}]}
>   Job Manager Deployment Status:  READY
>   Job Status:
>     Job Id:    cf44b5e73a1f263dd7d9f2c82be5216d
>     Job Name:  noah_stream_studio_1754211682_2218100380
>     Savepoint Info:
>       Last Periodic Savepoint Timestamp:  0
>       Savepoint History:
>     Start Time:     1705635107137
>     State:          RECONCILING
>     Update Time:    1709272530741
>   Lifecycle State:  STABLE {code}
>  
> !image-2024-03-05-15-13-11-032.png!
>  
> version:
> flink-kubernetes-operator: 1.6.1
> flink: 1.14.0/1.15.2 (flinkdeployment 1200+)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34576) Flink deployment keep staying at RECONCILING/STABLE status

2024-03-06 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824039#comment-17824039
 ] 

Gyula Fora commented on FLINK-34576:


I think what you found could definitely explain the problem, so there is a good 
chance that the JOSDK version upgrade will resolve it :) 

> Flink deployment keep staying at RECONCILING/STABLE status
> --
>
> Key: FLINK-34576
> URL: https://issues.apache.org/jira/browse/FLINK-34576
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
>Reporter: chenyuzhi
>Priority: Major
> Attachments: image-2024-03-05-15-13-11-032.png
>
>
> The HA mode of flink-kubernetes-operator is being used. When one of the pods 
> of flink-kubernetes-operator restarts, flink-kubernetes-operator switches the 
> leader. However, some flinkdeployments have been in the 
> *JOB_STATUS=RECONCILING_STATE=STABLE* state for a long time.
> Through the cmd "kubectl describe flinkdeployment xxx", can see the following 
> error, but there are no exceptions in the flink-kubernetes-operator log.
>  
> {code:java}
> Status:
>   Cluster Info:
>     Flink - Revision:             b6d20ed @ 2023-12-20T10:01:39+01:00
>     Flink - Version:              1.14.0-GDC1.6.0
>     Total - Cpu:                  7.0
>     Total - Memory:               30064771072
>   Error:                          
> {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
>  java.lang.RuntimeException: Failed to load 
> configuration","additionalMetadata":{},"throwableList":[{"type":"org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException","message":"java.lang.RuntimeException:
>  Failed to load 
> configuration","additionalMetadata":{}},{"type":"java.lang.RuntimeException","message":"Failed
>  to load configuration","additionalMetadata":{}}]}
>   Job Manager Deployment Status:  READY
>   Job Status:
>     Job Id:    cf44b5e73a1f263dd7d9f2c82be5216d
>     Job Name:  noah_stream_studio_1754211682_2218100380
>     Savepoint Info:
>       Last Periodic Savepoint Timestamp:  0
>       Savepoint History:
>     Start Time:     1705635107137
>     State:          RECONCILING
>     Update Time:    1709272530741
>   Lifecycle State:  STABLE {code}
>  
> !image-2024-03-05-15-13-11-032.png!
>  
> version:
> flink-kubernetes-operator: 1.6.1
> flink: 1.14.0/1.15.2 (flinkdeployment 1200+)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34576) Flink deployment keep staying at RECONCILING/STABLE status

2024-03-06 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824038#comment-17824038
 ] 

Gyula Fora commented on FLINK-34576:


Not sure how to repro this in a test easily, you could try upgrading the JOSDK 
version and testing with a custom build in your env to see if that solves the 
issue. 

> Flink deployment keep staying at RECONCILING/STABLE status
> --
>
> Key: FLINK-34576
> URL: https://issues.apache.org/jira/browse/FLINK-34576
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
>Reporter: chenyuzhi
>Priority: Major
> Attachments: image-2024-03-05-15-13-11-032.png
>
>
> The HA mode of flink-kubernetes-operator is being used. When one of the pods 
> of flink-kubernetes-operator restarts, flink-kubernetes-operator switches the 
> leader. However, some flinkdeployments have been in the 
> *JOB_STATUS=RECONCILING_STATE=STABLE* state for a long time.
> Through the cmd "kubectl describe flinkdeployment xxx", can see the following 
> error, but there are no exceptions in the flink-kubernetes-operator log.
>  
> {code:java}
> Status:
>   Cluster Info:
>     Flink - Revision:             b6d20ed @ 2023-12-20T10:01:39+01:00
>     Flink - Version:              1.14.0-GDC1.6.0
>     Total - Cpu:                  7.0
>     Total - Memory:               30064771072
>   Error:                          
> {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
>  java.lang.RuntimeException: Failed to load 
> configuration","additionalMetadata":{},"throwableList":[{"type":"org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException","message":"java.lang.RuntimeException:
>  Failed to load 
> configuration","additionalMetadata":{}},{"type":"java.lang.RuntimeException","message":"Failed
>  to load configuration","additionalMetadata":{}}]}
>   Job Manager Deployment Status:  READY
>   Job Status:
>     Job Id:    cf44b5e73a1f263dd7d9f2c82be5216d
>     Job Name:  noah_stream_studio_1754211682_2218100380
>     Savepoint Info:
>       Last Periodic Savepoint Timestamp:  0
>       Savepoint History:
>     Start Time:     1705635107137
>     State:          RECONCILING
>     Update Time:    1709272530741
>   Lifecycle State:  STABLE {code}
>  
> !image-2024-03-05-15-13-11-032.png!
>  
> version:
> flink-kubernetes-operator: 1.6.1
> flink: 1.14.0/1.15.2 (flinkdeployment 1200+)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work

2024-03-05 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823682#comment-17823682
 ] 

Gyula Fora commented on FLINK-34566:


[~Fei Feng] here is the JOSDK side fix, can you please help review it?
[https://github.com/operator-framework/java-operator-sdk/pull/2262/files]

 

> Flink Kubernetes Operator reconciliation parallelism setting not work
> -
>
> Key: FLINK-34566
> URL: https://issues.apache.org/jira/browse/FLINK-34566
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Fei Feng
>Assignee: Fei Feng
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2024-03-04-10-58-37-679.png, 
> image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png
>
>
> After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , 
> we can not enlarge reconciliation parallelism , and the maximum 
> reconciliation parallelism was only 10. This results FlinkDeployment and 
> SessionJob 's reconciliation delay about 10-30 seconds when we have a large 
> scale flink session cluster and session jobs in k8s cluster。
>  
> After investigating and validating, I found the reason is the logic for 
> reconciliation thread pool creation in JOSDK has changed significantly 
> between this two version. 
> v4.3.0: 
> reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize 
> was same as corePoolSize), so we pass the reconciliation thread and get a 
> thread pool that matches our expectations.
> !image-2024-03-04-10-58-37-679.png|width=497,height=91!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198]
>  
> but in v4.2.0:
> the reconciliation thread pool was created as a customer executor which we 
> can pass corePoolSize and maximumPoolSize to create this thread pool.The 
> problem is that we only set the maximumPoolSize of the thread pool, while, 
> the corePoolSize of the thread pool is defaulted to 10. This causes thread 
> pool size was only 10 and majority of events would be placed in the workQueue 
> for a while.  
> !image-2024-03-04-11-17-22-877.png|width=569,height=112!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37]
>  
> the solution is also simple, we can create and pass thread pool in flink 
> kubernetes operator so that we can control the reconciliation thread pool 
> directly, such as:
> !image-2024-03-04-11-31-44-451.png|width=483,height=98!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34576) Flink deployment keep staying at RECONCILING/STABLE status

2024-03-05 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823613#comment-17823613
 ] 

Gyula Fora edited comment on FLINK-34576 at 3/5/24 1:19 PM:


I am a bit busy at the moment so it will take some time until I get to this. In 
the meantime, I have 1-2 questions:

1.  Is there a way to somehow repro this on a smaller case?
2. Have you tried operator version 1.7.0? We may have fixed the issue there 
already
3. Does it also affect newer Flink versions as well?
4. Can you share some relevant operator logs?

Thanks


was (Author: gyfora):
I am a bit busy at the moment so it will take some time until I get to this. In 
the meantime, I have 1-2 questions:

1.  Is there a way to somehow repro this on a smaller case?
2. Have you tried operator version 1.7.0? We may have fixed the issue there 
already
3. Does it also affect newer Flink versions as well?

Thanks

> Flink deployment keep staying at RECONCILING/STABLE status
> --
>
> Key: FLINK-34576
> URL: https://issues.apache.org/jira/browse/FLINK-34576
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
>Reporter: chenyuzhi
>Priority: Major
> Attachments: image-2024-03-05-15-13-11-032.png
>
>
> The HA mode of flink-kubernetes-operator is being used. When one of the pods 
> of flink-kubernetes-operator restarts, flink-kubernetes-operator switches the 
> leader. However, some flinkdeployments have been in the 
> *JOB_STATUS=RECONCILING_STATE=STABLE* state for a long time.
> Through the cmd "kubectl describe flinkdeployment xxx", can see the following 
> error, but there are no exceptions in the flink-kubernetes-operator log.
>  
> {code:java}
> Status:
>   Cluster Info:
>     Flink - Revision:             b6d20ed @ 2023-12-20T10:01:39+01:00
>     Flink - Version:              1.14.0-GDC1.6.0
>     Total - Cpu:                  7.0
>     Total - Memory:               30064771072
>   Error:                          
> {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
>  java.lang.RuntimeException: Failed to load 
> configuration","additionalMetadata":{},"throwableList":[{"type":"org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException","message":"java.lang.RuntimeException:
>  Failed to load 
> configuration","additionalMetadata":{}},{"type":"java.lang.RuntimeException","message":"Failed
>  to load configuration","additionalMetadata":{}}]}
>   Job Manager Deployment Status:  READY
>   Job Status:
>     Job Id:    cf44b5e73a1f263dd7d9f2c82be5216d
>     Job Name:  noah_stream_studio_1754211682_2218100380
>     Savepoint Info:
>       Last Periodic Savepoint Timestamp:  0
>       Savepoint History:
>     Start Time:     1705635107137
>     State:          RECONCILING
>     Update Time:    1709272530741
>   Lifecycle State:  STABLE {code}
>  
> !image-2024-03-05-15-13-11-032.png!
>  
> version:
> flink-kubernetes-operator: 1.6.1
> flink: 1.14.0/1.15.2 (flinkdeployment 1200+)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34576) Flink deployment keep staying at RECONCILING/STABLE status

2024-03-05 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823613#comment-17823613
 ] 

Gyula Fora commented on FLINK-34576:


I am a bit busy at the moment so it will take some time until I get to this. In 
the meantime, I have 1-2 questions:

1.  Is there a way to somehow repro this on a smaller case?
2. Have you tried operator version 1.7.0? We may have fixed the issue there 
already
3. Does it also affect newer Flink versions as well?

Thanks

> Flink deployment keep staying at RECONCILING/STABLE status
> --
>
> Key: FLINK-34576
> URL: https://issues.apache.org/jira/browse/FLINK-34576
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
>Reporter: chenyuzhi
>Priority: Major
> Attachments: image-2024-03-05-15-13-11-032.png
>
>
> The HA mode of flink-kubernetes-operator is being used. When one of the pods 
> of flink-kubernetes-operator restarts, flink-kubernetes-operator switches the 
> leader. However, some flinkdeployments have been in the 
> *JOB_STATUS=RECONCILING_STATE=STABLE* state for a long time.
> Through the cmd "kubectl describe flinkdeployment xxx", can see the following 
> error, but there are no exceptions in the flink-kubernetes-operator log.
>  
> {code:java}
> Status:
>   Cluster Info:
>     Flink - Revision:             b6d20ed @ 2023-12-20T10:01:39+01:00
>     Flink - Version:              1.14.0-GDC1.6.0
>     Total - Cpu:                  7.0
>     Total - Memory:               30064771072
>   Error:                          
> {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
>  java.lang.RuntimeException: Failed to load 
> configuration","additionalMetadata":{},"throwableList":[{"type":"org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException","message":"java.lang.RuntimeException:
>  Failed to load 
> configuration","additionalMetadata":{}},{"type":"java.lang.RuntimeException","message":"Failed
>  to load configuration","additionalMetadata":{}}]}
>   Job Manager Deployment Status:  READY
>   Job Status:
>     Job Id:    cf44b5e73a1f263dd7d9f2c82be5216d
>     Job Name:  noah_stream_studio_1754211682_2218100380
>     Savepoint Info:
>       Last Periodic Savepoint Timestamp:  0
>       Savepoint History:
>     Start Time:     1705635107137
>     State:          RECONCILING
>     Update Time:    1709272530741
>   Lifecycle State:  STABLE {code}
>  
> !image-2024-03-05-15-13-11-032.png!
>  
> version:
> flink-kubernetes-operator: 1.6.1
> flink: 1.14.0/1.15.2 (flinkdeployment 1200+)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34576) Flink deployment keep staying at RECONCILING/STABLE status

2024-03-05 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-34576:
---
Description: 
The HA mode of flink-kubernetes-operator is being used. When one of the pods of 
flink-kubernetes-operator restarts, flink-kubernetes-operator switches the 
leader. However, some flinkdeployments have been in the 
*JOB_STATUS=RECONCILING_STATE=STABLE* state for a long time.

Through the cmd "kubectl describe flinkdeployment xxx", can see the following 
error, but there are no exceptions in the flink-kubernetes-operator log.

 
{code:java}
Status:
  Cluster Info:
    Flink - Revision:             b6d20ed @ 2023-12-20T10:01:39+01:00
    Flink - Version:              1.14.0-GDC1.6.0
    Total - Cpu:                  7.0
    Total - Memory:               30064771072
  Error:                          
{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.RuntimeException: Failed to load 
configuration","additionalMetadata":{},"throwableList":[{"type":"org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException","message":"java.lang.RuntimeException:
 Failed to load 
configuration","additionalMetadata":{}},{"type":"java.lang.RuntimeException","message":"Failed
 to load configuration","additionalMetadata":{}}]}
  Job Manager Deployment Status:  READY
  Job Status:
    Job Id:    cf44b5e73a1f263dd7d9f2c82be5216d
    Job Name:  noah_stream_studio_1754211682_2218100380
    Savepoint Info:
      Last Periodic Savepoint Timestamp:  0
      Savepoint History:
    Start Time:     1705635107137
    State:          RECONCILING
    Update Time:    1709272530741
  Lifecycle State:  STABLE {code}
 
!image-2024-03-05-15-13-11-032.png!

 

version:

flink-kubernetes-operator: 1.6.1

flink: 1.14.0/1.15.2 (flinkdeployment 1200+)

 

  was:
The HA mode of flink-kubernetes-operator is being used. When one of the pods of 
flink-kubernetes-operator restarts, flink-kubernetes-operator switches the 
leader. However, some flinkdeployments have been in the 
*JOB_STATUS=RECONCILING_STATE=STABLE* state for a long time.

Through the cmd "kubectl describe flinkdeployment xxx", can see the following 
error, but there are no exceptions in the flink-kubernetes-operator log.

 
{code:java}
Status:
  Cluster Info:
    Flink - Revision:             b6d20ed @ 2023-12-20T10:01:39+01:00
    Flink - Version:              1.14.0-GDC1.6.0
    Total - Cpu:                  7.0
    Total - Memory:               30064771072
  Error:                          
{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.RuntimeException: Failed to load 
configuration","additionalMetadata":{},"throwableList":[{"type":"org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException","message":"java.lang.RuntimeException:
 Failed to load 
configuration","additionalMetadata":{}},{"type":"java.lang.RuntimeException","message":"Failed
 to load configuration","additionalMetadata":{}}]}
  Job Manager Deployment Status:  READY
  Job Status:
    Job Id:    cf44b5e73a1f263dd7d9f2c82be5216d
    Job Name:  noah_stream_studio_1754211682_2218100380
    Savepoint Info:
      Last Periodic Savepoint Timestamp:  0
      Savepoint History:
    Start Time:     1705635107137
    State:          RECONCILING
    Update Time:    1709272530741
  Lifecycle State:  STABLE {code}
 
!image-2024-03-05-15-13-11-032.png!

 

version:

flink-kubernetes-operator: 1.6.1

flink: 1.14.0/1.15.2 (flinkdeployment 1200+)

 

[~gyfora] 


> Flink deployment keep staying at RECONCILING/STABLE status
> --
>
> Key: FLINK-34576
> URL: https://issues.apache.org/jira/browse/FLINK-34576
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
>Reporter: chenyuzhi
>Priority: Major
> Attachments: image-2024-03-05-15-13-11-032.png
>
>
> The HA mode of flink-kubernetes-operator is being used. When one of the pods 
> of flink-kubernetes-operator restarts, flink-kubernetes-operator switches the 
> leader. However, some flinkdeployments have been in the 
> *JOB_STATUS=RECONCILING_STATE=STABLE* state for a long time.
> Through the cmd "kubectl describe flinkdeployment xxx", can see the following 
> error, but there are no exceptions in the flink-kubernetes-operator log.
>  
> {code:java}
> Status:
>   Cluster Info:
>     Flink - Revision:             b6d20ed @ 2023-12-20T10:01:39+01:00
>     Flink - Version:              1.14.0-GDC1.6.0
>     Total - Cpu:     

[jira] [Assigned] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work

2024-03-04 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-34566:
--

Assignee: Fei Feng

> Flink Kubernetes Operator reconciliation parallelism setting not work
> -
>
> Key: FLINK-34566
> URL: https://issues.apache.org/jira/browse/FLINK-34566
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Fei Feng
>Assignee: Fei Feng
>Priority: Blocker
> Attachments: image-2024-03-04-10-58-37-679.png, 
> image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png
>
>
> After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , 
> we can not enlarge reconciliation parallelism , and the maximum 
> reconciliation parallelism was only 10. This results FlinkDeployment and 
> SessionJob 's reconciliation delay about 10-30 seconds when we have a large 
> scale flink session cluster and session jobs in k8s cluster。
>  
> After investigating and validating, I found the reason is the logic for 
> reconciliation thread pool creation in JOSDK has changed significantly 
> between this two version. 
> v4.3.0: 
> reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize 
> was same as corePoolSize), so we pass the reconciliation thread and get a 
> thread pool that matches our expectations.
> !image-2024-03-04-10-58-37-679.png|width=497,height=91!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198]
>  
> but in v4.2.0:
> the reconciliation thread pool was created as a customer executor which we 
> can pass corePoolSize and maximumPoolSize to create this thread pool.The 
> problem is that we only set the maximumPoolSize of the thread pool, while, 
> the corePoolSize of the thread pool is defaulted to 10. This causes thread 
> pool size was only 10 and majority of events would be placed in the workQueue 
> for a while.  
> !image-2024-03-04-11-17-22-877.png|width=569,height=112!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37]
>  
> the solution is also simple, we can create and pass thread pool in flink 
> kubernetes operator so that we can control the reconciliation thread pool 
> directly, such as:
> !image-2024-03-04-11-31-44-451.png|width=483,height=98!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work

2024-03-04 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823457#comment-17823457
 ] 

Gyula Fora commented on FLINK-34566:


Thanks [~Fei Feng] !

> Flink Kubernetes Operator reconciliation parallelism setting not work
> -
>
> Key: FLINK-34566
> URL: https://issues.apache.org/jira/browse/FLINK-34566
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Fei Feng
>Assignee: Fei Feng
>Priority: Blocker
> Attachments: image-2024-03-04-10-58-37-679.png, 
> image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png
>
>
> After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , 
> we can not enlarge reconciliation parallelism , and the maximum 
> reconciliation parallelism was only 10. This results FlinkDeployment and 
> SessionJob 's reconciliation delay about 10-30 seconds when we have a large 
> scale flink session cluster and session jobs in k8s cluster。
>  
> After investigating and validating, I found the reason is the logic for 
> reconciliation thread pool creation in JOSDK has changed significantly 
> between this two version. 
> v4.3.0: 
> reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize 
> was same as corePoolSize), so we pass the reconciliation thread and get a 
> thread pool that matches our expectations.
> !image-2024-03-04-10-58-37-679.png|width=497,height=91!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198]
>  
> but in v4.2.0:
> the reconciliation thread pool was created as a customer executor which we 
> can pass corePoolSize and maximumPoolSize to create this thread pool.The 
> problem is that we only set the maximumPoolSize of the thread pool, while, 
> the corePoolSize of the thread pool is defaulted to 10. This causes thread 
> pool size was only 10 and majority of events would be placed in the workQueue 
> for a while.  
> !image-2024-03-04-11-17-22-877.png|width=569,height=112!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37]
>  
> the solution is also simple, we can create and pass thread pool in flink 
> kubernetes operator so that we can control the reconciliation thread pool 
> directly, such as:
> !image-2024-03-04-11-31-44-451.png|width=483,height=98!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work

2024-03-04 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823433#comment-17823433
 ] 

Gyula Fora edited comment on FLINK-34566 at 3/5/24 6:02 AM:


Thanks for the detailed explanation you are completely right, I missed this 
part. Sounds actually like a bug to me in JOSDK. Can you open a PR to fix it on 
our part by replacing it with the fixed thread pool? (Or overriding the min 
parallelism as well to the max value )


was (Author: gyfora):
Thanks for the detailed explanation, I missed this part. Sounds actually like a 
bug to me in JOSDK. Can you open a PR to fix it on our part by replacing it 
with the fixed thread pool? (Or overriding the min parallelism as well to the 
max value )

> Flink Kubernetes Operator reconciliation parallelism setting not work
> -
>
> Key: FLINK-34566
> URL: https://issues.apache.org/jira/browse/FLINK-34566
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Fei Feng
>Priority: Blocker
> Attachments: image-2024-03-04-10-58-37-679.png, 
> image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png
>
>
> After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , 
> we can not enlarge reconciliation parallelism , and the maximum 
> reconciliation parallelism was only 10. This results FlinkDeployment and 
> SessionJob 's reconciliation delay about 10-30 seconds when we have a large 
> scale flink session cluster and session jobs in k8s cluster。
>  
> After investigating and validating, I found the reason is the logic for 
> reconciliation thread pool creation in JOSDK has changed significantly 
> between this two version. 
> v4.3.0: 
> reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize 
> was same as corePoolSize), so we pass the reconciliation thread and get a 
> thread pool that matches our expectations.
> !image-2024-03-04-10-58-37-679.png|width=497,height=91!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198]
>  
> but in v4.2.0:
> the reconciliation thread pool was created as a customer executor which we 
> can pass corePoolSize and maximumPoolSize to create this thread pool.The 
> problem is that we only set the maximumPoolSize of the thread pool, while, 
> the corePoolSize of the thread pool is defaulted to 10. This causes thread 
> pool size was only 10 and majority of events would be placed in the workQueue 
> for a while.  
> !image-2024-03-04-11-17-22-877.png|width=569,height=112!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37]
>  
> the solution is also simple, we can create and pass thread pool in flink 
> kubernetes operator so that we can control the reconciliation thread pool 
> directly, such as:
> !image-2024-03-04-11-31-44-451.png|width=483,height=98!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work

2024-03-04 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823433#comment-17823433
 ] 

Gyula Fora commented on FLINK-34566:


Thanks for the detailed explanation, I missed this part. Sounds actually like a 
bug to me in JOSDK. Can you open a PR to fix it on our part by replacing it 
with the fixed thread pool? (Or overriding the min parallelism as well to the 
max value )

> Flink Kubernetes Operator reconciliation parallelism setting not work
> -
>
> Key: FLINK-34566
> URL: https://issues.apache.org/jira/browse/FLINK-34566
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Fei Feng
>Priority: Blocker
> Attachments: image-2024-03-04-10-58-37-679.png, 
> image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png
>
>
> After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , 
> we can not enlarge reconciliation parallelism , and the maximum 
> reconciliation parallelism was only 10. This results FlinkDeployment and 
> SessionJob 's reconciliation delay about 10-30 seconds when we have a large 
> scale flink session cluster and session jobs in k8s cluster。
>  
> After investigating and validating, I found the reason is the logic for 
> reconciliation thread pool creation in JOSDK has changed significantly 
> between this two version. 
> v4.3.0: 
> reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize 
> was same as corePoolSize), so we pass the reconciliation thread and get a 
> thread pool that matches our expectations.
> !image-2024-03-04-10-58-37-679.png|width=497,height=91!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198]
>  
> but in v4.2.0:
> the reconciliation thread pool was created as a customer executor which we 
> can pass corePoolSize and maximumPoolSize to create this thread pool.The 
> problem is that we only set the maximumPoolSize of the thread pool, while, 
> the corePoolSize of the thread pool is defaulted to 10. This causes thread 
> pool size was only 10 and majority of events would be placed in the workQueue 
> for a while.  
> !image-2024-03-04-11-17-22-877.png|width=569,height=112!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37]
>  
> the solution is also simple, we can create and pass thread pool in flink 
> kubernetes operator so that we can control the reconciliation thread pool 
> directly, such as:
> !image-2024-03-04-11-31-44-451.png|width=483,height=98!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work

2024-03-04 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-34566.
--
Resolution: Not A Problem

I am closing this ticket for now, if you feel that this resolution is incorrect 
please re-open it.

> Flink Kubernetes Operator reconciliation parallelism setting not work
> -
>
> Key: FLINK-34566
> URL: https://issues.apache.org/jira/browse/FLINK-34566
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Fei Feng
>Priority: Blocker
> Attachments: image-2024-03-04-10-58-37-679.png, 
> image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png
>
>
> After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , 
> we can not enlarge reconciliation parallelism , and the maximum 
> reconciliation parallelism was only 10. This results FlinkDeployment and 
> SessionJob 's reconciliation delay about 10-30 seconds when we have a large 
> scale flink session cluster and session jobs in k8s cluster。
>  
> After investigating and validating, I found the reason is the logic for 
> reconciliation thread pool creation in JOSDK has changed significantly 
> between this two version. 
> v4.3.0: 
> reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize 
> was same as corePoolSize), so we pass the reconciliation thread and get a 
> thread pool that matches our expectations.
> !image-2024-03-04-10-58-37-679.png|width=497,height=91!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198]
>  
> but in v4.2.0:
> the reconciliation thread pool was created as a customer executor which we 
> can pass corePoolSize and maximumPoolSize to create this thread pool.The 
> problem is that we only set the maximumPoolSize of the thread pool, while, 
> the corePoolSize of the thread pool is defaulted to 10. This causes thread 
> pool size was only 10 and majority of events would be placed in the workQueue 
> for a while.  
> !image-2024-03-04-11-17-22-877.png|width=569,height=112!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37]
>  
> the solution is also simple, we can create and pass thread pool in flink 
> kubernetes operator so that we can control the reconciliation thread pool 
> directly, such as:
> !image-2024-03-04-11-31-44-451.png|width=483,height=98!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33992) Add option to fetch the jar from private repository in FlinkSessionJob

2024-03-04 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823169#comment-17823169
 ] 

Gyula Fora commented on FLINK-33992:


It would be great if Flink itself would have a way of downloading jars from a 
target url for session job submissions instead of having to upload it before 
from the operator 

> Add option to fetch the jar from private repository in FlinkSessionJob
> --
>
> Key: FLINK-33992
> URL: https://issues.apache.org/jira/browse/FLINK-33992
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Sweta Kalakuntla
>Priority: Major
>
> FlinkSessionJob spec does not have a capability to download job jar from 
> remote private repository. It can currently only download from public 
> repositories. 
> Adding capability to supply credentials  to the *spec.job.jarURI* in 
> FlinkSessionJob, will solve that problem.
> If I use initContainer to download the jar in FlinkDeployment and try to 
> access that in FlinkSessionJob, the operator is unable to find the jar in the 
> defined path.
> ---
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkSessionJob
> metadata:
>   name: job1
> spec:
>   deploymentName: session-cluster
>   job:
> jarURI: file:///opt/flink/job.jar
> parallelism: 4
> upgradeMode: savepoint
> (edited)
> caused by: java.io.FileNotFoundException: /opt/flink/job.jar (No such file or 
> directory)
> at java.base/java.io.FileInputStream.open0(Native Method)
> at java.base/java.io.FileInputStream.open(Unknown Source)
> at java.base/java.io.FileInputStream.(Unknown Source)
> at 
> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
> at 
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134)
> at 
> org.apache.flink.kubernetes.operator.artifact.FileSystemBasedArtifactFetcher.fetch(FileSystemBasedArtifactFetcher.java:44)
> at 
> org.apache.flink.kubernetes.operator.artifact.ArtifactManager.fetch(ArtifactManager.java:63)
> at 
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.uploadJar(AbstractFlinkService.java:707)
> at 
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitJobToSessionCluster(AbstractFlinkService.java:212)
> at 
> org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler.deploy(SessionJobReconciler.java:73)
> at 
> org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler.deploy(SessionJobReconciler.java:44)
> at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:120)
> at 
> org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:109)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33992) Add option to fetch the jar from private repository in FlinkSessionJob

2024-03-04 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823168#comment-17823168
 ] 

Gyula Fora commented on FLINK-33992:


For session job submissions the jar generally has to be downloaded and 
submitted from the operator itself. So initContainers are not really applicable 
here unless you mean download it into the session cluster lib itself which only 
works in some special cases.

> Add option to fetch the jar from private repository in FlinkSessionJob
> --
>
> Key: FLINK-33992
> URL: https://issues.apache.org/jira/browse/FLINK-33992
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Sweta Kalakuntla
>Priority: Major
>
> FlinkSessionJob spec does not have a capability to download job jar from 
> remote private repository. It can currently only download from public 
> repositories. 
> Adding capability to supply credentials  to the *spec.job.jarURI* in 
> FlinkSessionJob, will solve that problem.
> If I use initContainer to download the jar in FlinkDeployment and try to 
> access that in FlinkSessionJob, the operator is unable to find the jar in the 
> defined path.
> ---
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkSessionJob
> metadata:
>   name: job1
> spec:
>   deploymentName: session-cluster
>   job:
> jarURI: file:///opt/flink/job.jar
> parallelism: 4
> upgradeMode: savepoint
> (edited)
> caused by: java.io.FileNotFoundException: /opt/flink/job.jar (No such file or 
> directory)
> at java.base/java.io.FileInputStream.open0(Native Method)
> at java.base/java.io.FileInputStream.open(Unknown Source)
> at java.base/java.io.FileInputStream.(Unknown Source)
> at 
> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
> at 
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134)
> at 
> org.apache.flink.kubernetes.operator.artifact.FileSystemBasedArtifactFetcher.fetch(FileSystemBasedArtifactFetcher.java:44)
> at 
> org.apache.flink.kubernetes.operator.artifact.ArtifactManager.fetch(ArtifactManager.java:63)
> at 
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.uploadJar(AbstractFlinkService.java:707)
> at 
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitJobToSessionCluster(AbstractFlinkService.java:212)
> at 
> org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler.deploy(SessionJobReconciler.java:73)
> at 
> org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler.deploy(SessionJobReconciler.java:44)
> at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:120)
> at 
> org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:109)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work

2024-03-04 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823165#comment-17823165
 ] 

Gyula Fora commented on FLINK-34566:


{noformat}
A ThreadPoolExecutor will automatically adjust the pool size (see getPoolSize) 
according to the bounds set by corePoolSize (see getCorePoolSize) and 
maximumPoolSize (see getMaximumPoolSize). When a new task is submitted in 
method execute(Runnable), if fewer than corePoolSize threads are running, a new 
thread is created to handle the request, even if other worker threads are idle. 
Else if fewer than maximumPoolSize threads are running, a new thread will be 
created to handle the request only if the queue is full. By setting 
corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. 
By setting maximumPoolSize to an essentially unbounded value such as 
Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of 
concurrent tasks. Most typically, core and maximum pool sizes are set only upon 
construction, but they may also be changed dynamically using setCorePoolSize 
and setMaximumPoolSize.{noformat}

> Flink Kubernetes Operator reconciliation parallelism setting not work
> -
>
> Key: FLINK-34566
> URL: https://issues.apache.org/jira/browse/FLINK-34566
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Fei Feng
>Priority: Blocker
> Attachments: image-2024-03-04-10-58-37-679.png, 
> image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png
>
>
> After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , 
> we can not enlarge reconciliation parallelism , and the maximum 
> reconciliation parallelism was only 10. This results FlinkDeployment and 
> SessionJob 's reconciliation delay about 10-30 seconds when we have a large 
> scale flink session cluster and session jobs in k8s cluster。
>  
> After investigating and validating, I found the reason is the logic for 
> reconciliation thread pool creation in JOSDK has changed significantly 
> between this two version. 
> v4.3.0: 
> reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize 
> was same as corePoolSize), so we pass the reconciliation thread and get a 
> thread pool that matches our expectations.
> !image-2024-03-04-10-58-37-679.png|width=497,height=91!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198]
>  
> but in v4.2.0:
> the reconciliation thread pool was created as a customer executor which we 
> can pass corePoolSize and maximumPoolSize to create this thread pool.The 
> problem is that we only set the maximumPoolSize of the thread pool, while, 
> the corePoolSize of the thread pool is defaulted to 10. This causes thread 
> pool size was only 10 and majority of events would be placed in the workQueue 
> for a while.  
> !image-2024-03-04-11-17-22-877.png|width=569,height=112!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37]
>  
> the solution is also simple, we can create and pass thread pool in flink 
> kubernetes operator so that we can control the reconciliation thread pool 
> directly, such as:
> !image-2024-03-04-11-31-44-451.png|width=483,height=98!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work

2024-03-04 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823166#comment-17823166
 ] 

Gyula Fora commented on FLINK-34566:


>From the java docs:

"A ThreadPoolExecutor will automatically adjust the pool size (see getPoolSize) 
according to the bounds set by corePoolSize (see getCorePoolSize) and 
maximumPoolSize (see getMaximumPoolSize). When a new task is submitted in 
method execute(Runnable), if fewer than corePoolSize threads are running, a new 
thread is created to handle the request, even if other worker threads are idle. 
Else if fewer than maximumPoolSize threads are running, a new thread will be 
created to handle the request only if the queue is full. By setting 
corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. 
By setting maximumPoolSize to an essentially unbounded value such as 
Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of 
concurrent tasks. Most typically, core and maximum pool sizes are set only upon 
construction, but they may also be changed dynamically using setCorePoolSize 
and setMaximumPoolSize."

> Flink Kubernetes Operator reconciliation parallelism setting not work
> -
>
> Key: FLINK-34566
> URL: https://issues.apache.org/jira/browse/FLINK-34566
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Fei Feng
>Priority: Blocker
> Attachments: image-2024-03-04-10-58-37-679.png, 
> image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png
>
>
> After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , 
> we can not enlarge reconciliation parallelism , and the maximum 
> reconciliation parallelism was only 10. This results FlinkDeployment and 
> SessionJob 's reconciliation delay about 10-30 seconds when we have a large 
> scale flink session cluster and session jobs in k8s cluster。
>  
> After investigating and validating, I found the reason is the logic for 
> reconciliation thread pool creation in JOSDK has changed significantly 
> between this two version. 
> v4.3.0: 
> reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize 
> was same as corePoolSize), so we pass the reconciliation thread and get a 
> thread pool that matches our expectations.
> !image-2024-03-04-10-58-37-679.png|width=497,height=91!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198]
>  
> but in v4.2.0:
> the reconciliation thread pool was created as a customer executor which we 
> can pass corePoolSize and maximumPoolSize to create this thread pool.The 
> problem is that we only set the maximumPoolSize of the thread pool, while, 
> the corePoolSize of the thread pool is defaulted to 10. This causes thread 
> pool size was only 10 and majority of events would be placed in the workQueue 
> for a while.  
> !image-2024-03-04-11-17-22-877.png|width=569,height=112!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37]
>  
> the solution is also simple, we can create and pass thread pool in flink 
> kubernetes operator so that we can control the reconciliation thread pool 
> directly, such as:
> !image-2024-03-04-11-31-44-451.png|width=483,height=98!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work

2024-03-04 Thread Gyula Fora (Jira)


[ https://issues.apache.org/jira/browse/FLINK-34566 ]


Gyula Fora deleted comment on FLINK-34566:


was (Author: gyfora):
{noformat}
A ThreadPoolExecutor will automatically adjust the pool size (see getPoolSize) 
according to the bounds set by corePoolSize (see getCorePoolSize) and 
maximumPoolSize (see getMaximumPoolSize). When a new task is submitted in 
method execute(Runnable), if fewer than corePoolSize threads are running, a new 
thread is created to handle the request, even if other worker threads are idle. 
Else if fewer than maximumPoolSize threads are running, a new thread will be 
created to handle the request only if the queue is full. By setting 
corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. 
By setting maximumPoolSize to an essentially unbounded value such as 
Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of 
concurrent tasks. Most typically, core and maximum pool sizes are set only upon 
construction, but they may also be changed dynamically using setCorePoolSize 
and setMaximumPoolSize.{noformat}

> Flink Kubernetes Operator reconciliation parallelism setting not work
> -
>
> Key: FLINK-34566
> URL: https://issues.apache.org/jira/browse/FLINK-34566
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Fei Feng
>Priority: Blocker
> Attachments: image-2024-03-04-10-58-37-679.png, 
> image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png
>
>
> After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , 
> we can not enlarge reconciliation parallelism , and the maximum 
> reconciliation parallelism was only 10. This results FlinkDeployment and 
> SessionJob 's reconciliation delay about 10-30 seconds when we have a large 
> scale flink session cluster and session jobs in k8s cluster。
>  
> After investigating and validating, I found the reason is the logic for 
> reconciliation thread pool creation in JOSDK has changed significantly 
> between this two version. 
> v4.3.0: 
> reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize 
> was same as corePoolSize), so we pass the reconciliation thread and get a 
> thread pool that matches our expectations.
> !image-2024-03-04-10-58-37-679.png|width=497,height=91!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198]
>  
> but in v4.2.0:
> the reconciliation thread pool was created as a customer executor which we 
> can pass corePoolSize and maximumPoolSize to create this thread pool.The 
> problem is that we only set the maximumPoolSize of the thread pool, while, 
> the corePoolSize of the thread pool is defaulted to 10. This causes thread 
> pool size was only 10 and majority of events would be placed in the workQueue 
> for a while.  
> !image-2024-03-04-11-17-22-877.png|width=569,height=112!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37]
>  
> the solution is also simple, we can create and pass thread pool in flink 
> kubernetes operator so that we can control the reconciliation thread pool 
> directly, such as:
> !image-2024-03-04-11-31-44-451.png|width=483,height=98!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work

2024-03-04 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823164#comment-17823164
 ] 

Gyula Fora commented on FLINK-34566:


Even if the core pool size is 10, the maxpoolsize defines how many threads are 
created maximum. After 1 minute of idling some of these threads are released 
and the pool can shrink when not in use. 

This is how it was designed.

> Flink Kubernetes Operator reconciliation parallelism setting not work
> -
>
> Key: FLINK-34566
> URL: https://issues.apache.org/jira/browse/FLINK-34566
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Fei Feng
>Priority: Blocker
> Attachments: image-2024-03-04-10-58-37-679.png, 
> image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png
>
>
> After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , 
> we can not enlarge reconciliation parallelism , and the maximum 
> reconciliation parallelism was only 10. This results FlinkDeployment and 
> SessionJob 's reconciliation delay about 10-30 seconds when we have a large 
> scale flink session cluster and session jobs in k8s cluster。
>  
> After investigating and validating, I found the reason is the logic for 
> reconciliation thread pool creation in JOSDK has changed significantly 
> between this two version. 
> v4.3.0: 
> reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize 
> was same as corePoolSize), so we pass the reconciliation thread and get a 
> thread pool that matches our expectations.
> !image-2024-03-04-10-58-37-679.png|width=497,height=91!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198]
>  
> but in v4.2.0:
> the reconciliation thread pool was created as a customer executor which we 
> can pass corePoolSize and maximumPoolSize to create this thread pool.The 
> problem is that we only set the maximumPoolSize of the thread pool, while, 
> the corePoolSize of the thread pool is defaulted to 10. This causes thread 
> pool size was only 10 and majority of events would be placed in the workQueue 
> for a while.  
> !image-2024-03-04-11-17-22-877.png|width=569,height=112!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37]
>  
> the solution is also simple, we can create and pass thread pool in flink 
> kubernetes operator so that we can control the reconciliation thread pool 
> directly, such as:
> !image-2024-03-04-11-31-44-451.png|width=483,height=98!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work

2024-03-04 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823163#comment-17823163
 ] 

Gyula Fora commented on FLINK-34566:


Looking at this in detail I think it should work as expected. The thread pool 
max size is set correctly and we can have at most the defined parallelism 
number of threads.

> Flink Kubernetes Operator reconciliation parallelism setting not work
> -
>
> Key: FLINK-34566
> URL: https://issues.apache.org/jira/browse/FLINK-34566
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Fei Feng
>Priority: Blocker
> Attachments: image-2024-03-04-10-58-37-679.png, 
> image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png
>
>
> After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , 
> we can not enlarge reconciliation parallelism , and the maximum 
> reconciliation parallelism was only 10. This results FlinkDeployment and 
> SessionJob 's reconciliation delay about 10-30 seconds when we have a large 
> scale flink session cluster and session jobs in k8s cluster。
>  
> After investigating and validating, I found the reason is the logic for 
> reconciliation thread pool creation in JOSDK has changed significantly 
> between this two version. 
> v4.3.0: 
> reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize 
> was same as corePoolSize), so we pass the reconciliation thread and get a 
> thread pool that matches our expectations.
> !image-2024-03-04-10-58-37-679.png|width=497,height=91!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198]
>  
> but in v4.2.0:
> the reconciliation thread pool was created as a customer executor which we 
> can pass corePoolSize and maximumPoolSize to create this thread pool.The 
> problem is that we only set the maximumPoolSize of the thread pool, while, 
> the corePoolSize of the thread pool is defaulted to 10. This causes thread 
> pool size was only 10 and majority of events would be placed in the workQueue 
> for a while.  
> !image-2024-03-04-11-17-22-877.png|width=569,height=112!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37]
>  
> the solution is also simple, we can create and pass thread pool in flink 
> kubernetes operator so that we can control the reconciliation thread pool 
> directly, such as:
> !image-2024-03-04-11-31-44-451.png|width=483,height=98!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34566) Flink Kubernetes Operator reconciliation parallelism setting not work

2024-03-03 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823049#comment-17823049
 ] 

Gyula Fora commented on FLINK-34566:


You are saying that this is a bug in the JOSDK ? 
[https://github.com/operator-framework/java-operator-sdk]

Can you please also report this there, maybe it's already fixed and we can 
updgrade to a newer version?

> Flink Kubernetes Operator reconciliation parallelism setting not work
> -
>
> Key: FLINK-34566
> URL: https://issues.apache.org/jira/browse/FLINK-34566
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Fei Feng
>Priority: Blocker
> Attachments: image-2024-03-04-10-58-37-679.png, 
> image-2024-03-04-11-17-22-877.png, image-2024-03-04-11-31-44-451.png
>
>
> After we upgrade JOSDK to version 4.4.2 from version 4.3.0 in FLINK-33005 , 
> we can not enlarge reconciliation parallelism , and the maximum 
> reconciliation parallelism was only 10. This results FlinkDeployment and 
> SessionJob 's reconciliation delay about 10-30 seconds when we have a large 
> scale flink session cluster and session jobs in k8s cluster。
>  
> After investigating and validating, I found the reason is the logic for 
> reconciliation thread pool creation in JOSDK has changed significantly 
> between this two version. 
> v4.3.0: 
> reconciliation thread pool was created as a FixedThreadPool ( maximumPoolSize 
> was same as corePoolSize), so we pass the reconciliation thread and get a 
> thread pool that matches our expectations.
> !image-2024-03-04-10-58-37-679.png|width=497,height=91!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.3.0/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java#L198]
>  
> but in v4.2.0:
> the reconciliation thread pool was created as a customer executor which we 
> can pass corePoolSize and maximumPoolSize to create this thread pool.The 
> problem is that we only set the maximumPoolSize of the thread pool, while, 
> the corePoolSize of the thread pool is defaulted to 10. This causes thread 
> pool size was only 10 and majority of events would be placed in the workQueue 
> for a while.  
> !image-2024-03-04-11-17-22-877.png|width=569,height=112!
> [https://github.com/operator-framework/java-operator-sdk/blob/v4.4.2/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java#L37]
>  
> the solution is also simple, we can create and pass thread pool in flink 
> kubernetes operator so that we can control the reconciliation thread pool 
> directly, such as:
> !image-2024-03-04-11-31-44-451.png|width=483,height=98!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34561) Downgrading flink-kubernetes-operator causes failure

2024-03-01 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-34561.
--
Resolution: Not A Problem

This works as expected. CRD and the operator is backward compatible, not 
forward compatible. As you see in the docs:

```
This means that if you have Flink resources deployed (and Flink applications 
running), you can still safely upgrade to newer versions of the operator and 
CRD without any problems.
```

We do not currently provide any guarantee for downgrading the operator as it 
may populate new status fields that are not compatible with previous operator 
versions.

However newer operator versions are much more flexible on JSON deserialisation. 
So I would assume that going back to 1.6 from 1.7 would not be an issue however 
we do not have explicit guarantee there.

We recommend upgrading to the latest prod release: 1.7.0

> Downgrading flink-kubernetes-operator causes failure
> 
>
> Key: FLINK-34561
> URL: https://issues.apache.org/jira/browse/FLINK-34561
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Avi Sanwal
>Priority: Major
>
> Hi,
> We are currently using flink-kubernetes-operator 1.4 version (with v1beta1 
> CRD) to manage flink deployments.
> We tried an upgrade to version 1.6 version, but when we try to rollback to 
> 1.4, noticed that flink-operator fails to come up with this execption:
> {noformat}
> Exception in thread "main" io.javaoperatorsdk.operator.OperatorException: 
> Error starting operator
>   at io.javaoperatorsdk.operator.Operator.start(Operator.java:125)
>   at 
> org.apache.flink.kubernetes.operator.FlinkOperator.run(FlinkOperator.java:215)
>   at 
> org.apache.flink.kubernetes.operator.FlinkOperator.main(FlinkOperator.java:229)
> Caused by: io.javaoperatorsdk.operator.OperatorException: 
> java.lang.IllegalStateException: 
> com.fasterxml.jackson.databind.JsonMappingException: Could not deserialize 
> spec, this indicates a bug... (through reference chain: 
> org.apache.flink.kubernetes.operator.api.FlinkDeployment["status"]->org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus["lifecycleState"])
>   at 
> io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.lambda$executeAndWaitForAllToComplete$2(ExecutorServiceManager.java:107)
>   at java.base/java.util.ArrayList.forEach(Unknown Source)
>   at 
> io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.executeAndWaitForAllToComplete(ExecutorServiceManager.java:102)
>   at 
> io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(ExecutorServiceManager.java:82)
>   at 
> io.javaoperatorsdk.operator.ControllerManager.start(ControllerManager.java:36)
>   at io.javaoperatorsdk.operator.Operator.start(Operator.java:120)
>   ... 2 more
> Caused by: java.lang.IllegalStateException: 
> com.fasterxml.jackson.databind.JsonMappingException: Could not deserialize 
> spec, this indicates a bug... (through reference chain: 
> org.apache.flink.kubernetes.operator.api.FlinkDeployment["status"]->org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus["lifecycleState"])
>   at 
> io.javaoperatorsdk.operator.api.config.ConfigurationService$1.clone(ConfigurationService.java:112)
>   at java.base/java.util.Optional.map(Unknown Source)
>   at 
> io.javaoperatorsdk.operator.processing.event.source.informer.InformerManager.get(InformerManager.java:175)
>   at 
> io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource.get(ManagedInformerEventSource.java:110)
>   at 
> io.javaoperatorsdk.operator.processing.event.EventProcessor.submitReconciliationExecution(EventProcessor.java:128)
>   at 
> io.javaoperatorsdk.operator.processing.event.EventProcessor.handleMarkedEventForResource(EventProcessor.java:120)
>   at 
> io.javaoperatorsdk.operator.processing.event.EventProcessor.handleAlreadyMarkedEvents(EventProcessor.java:376)
>   at 
> io.javaoperatorsdk.operator.processing.event.EventProcessor.start(EventProcessor.java:371)
>   at 
> io.javaoperatorsdk.operator.processing.Controller.start(Controller.java:336)
>   at 
> io.javaoperatorsdk.operator.ControllerManager.lambda$start$0(ControllerManager.java:37)
>   at 
> io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.lambda$executeAndWaitForAllToComplete$0(ExecutorServiceManager.java:96)
>   at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>   at java.base/java.lang.Thread.run(Unknown Source)
> 

[jira] [Commented] (FLINK-34524) Scale down JobManager deployment to 0 before deletion

2024-02-26 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820991#comment-17820991
 ] 

Gyula Fora commented on FLINK-34524:


cc [~mateczagany] 

> Scale down JobManager deployment to 0 before deletion
> -
>
> Key: FLINK-34524
> URL: https://issues.apache.org/jira/browse/FLINK-34524
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>
> We recently improved the JM deployment deletion mechanism, however it seems 
> like task manager pod deletion can get stuck sometimes for a couple of 
> minutes in native mode if we simply try to delete everything at once.
> It speeds up the process and leads to cleaner shutdown if we scale down the 
> JM deployment to 0 (shutting down the JM pods first) and then perform the 
> deletion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34524) Scale down JobManager deployment to 0 before deletion

2024-02-26 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-34524:
--

 Summary: Scale down JobManager deployment to 0 before deletion
 Key: FLINK-34524
 URL: https://issues.apache.org/jira/browse/FLINK-34524
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora
Assignee: Gyula Fora


We recently improved the JM deployment deletion mechanism, however it seems 
like task manager pod deletion can get stuck sometimes for a couple of minutes 
in native mode if we simply try to delete everything at once.

It speeds up the process and leads to cleaner shutdown if we scale down the JM 
deployment to 0 (shutting down the JM pods first) and then perform the deletion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34451) [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading fallback approach

2024-02-26 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820657#comment-17820657
 ] 

Gyula Fora commented on FLINK-34451:


I opened a new ticket to track this issue explicitly for the adaptive 
scheduler: https://issues.apache.org/jira/browse/FLINK-34518

> [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading 
> fallback approach
> --
>
> Key: FLINK-34451
> URL: https://issues.apache.org/jira/browse/FLINK-34451
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: Operator version: 1.7.1
> Flink version 1.18.0
> HA JobManagers
> Adaptive scheduler mode using the operator's autoscaler
> Checkpointing at an interval of 60s
> Upgrade mode savepoint
>Reporter: Alex Hoffer
>Priority: Major
>
>  
> We had a situation where TaskManagers were constantly restarting from OOM. 
> We're using the Adaptive scheduler with the Kubernetes Operator, and a 
> restart strategy of exponential backoff, and so the JobManagers remained 
> alive. We're also using savepoint upgrade mode. 
> When we tried to remedy the situation by raising the direct memory allocation 
> to the pods, we were surprised that Flink used the last savepoint taken, 
> rather than the checkpoint. This was unfortunate for us because we are on 
> adaptive scheduler and the job hasn't changed in some time, so this last 
> savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up 
> until failure. I can confirm the HA metadata existed in the configmaps, and 
> the corresponding checkpoints existed in remote storage for it to access. 
> Plus, no Flink version changes were in the deployment.
> The Operator logs reported that it was using last-state recovery in this 
> situation:
> {code:java}
> 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
> FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : 
> null -> 100]), starting reconciliation.
> 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] 
> Upgrading/Restarting running job, suspending first...
> 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] 
> Job is not running but HA metadata is available for last state restore, ready 
> for upgrade
> 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUSPENDED   | Suspending existing deployment.
> 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] 
> Deleting JobManager deployment while preserving HA metadata. 
> 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Status | Info| UPGRADING   | The resource is being upgraded 
> 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUBMIT  | Starting deployment
> 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] 
> Deploying application cluster requiring last-state from HA metadata
> 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils [INFO ][job-name] Job 
> graph in ConfigMap job-name-cluster-config-map is deleted {code}
> But when the job booted up, it reported restoring from savepoint:
> {code:java}
> 2024-02-15 19:39:03,887 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 522b3c363499d81ed7922aa30b13e237 from Savepoint 20207 @ 0 for 
> 522b3c363499d81ed7922aa30b13e237 located at 
> abfss://savepoi...@storageaccount.dfs.core.windows.net/job-name/savepoint-522b3c-8836a1edc709.
>  {code}
> Our expectation was that the Operator logs were true, and that it would be 
> restoring from checkpoint. We had to scramble and manually restore from the 
> checkpoint to restore function.
>  
>  
> It's also worth noting I can recreate this issue in a testing environment. 
> The process for doing so is:
> - Boot up HA JobManagers with checkpoints on and savepoint upgrade mode, 
> using adaptive scheduler
> - Make a dummy change to trigger a savepoint.
> - Allow the TaskManagers to process some data and hit the checkpoint interval.
> - Cause the TaskManagers to crash. In our case, we could use up a bunch of 
> memory in the pods and cause it to crash.
> - Observe the Operator logs saying it is restoring from last-state, but watch 
> as the pods instead use the last savepoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34518) Adaptive Scheduler restores from empty state if JM fails during restarting state

2024-02-26 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820656#comment-17820656
 ] 

Gyula Fora commented on FLINK-34518:


cc [~mapohl] [~chesnay] 

> Adaptive Scheduler restores from empty state if JM fails during restarting 
> state
> 
>
> Key: FLINK-34518
> URL: https://issues.apache.org/jira/browse/FLINK-34518
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.18.1
>Reporter: Gyula Fora
>Priority: Critical
>
> If a JobManager failover occurs while the Job is in a Restarting state, the 
> HA metadata is deleted (as if it was a globally terminal state) and the job 
> restarts from an empty state after the JM comes back up:
> Jobmanager killed after killing Taskmanager (restarting phase):
> {noformat}
> 2024-02-26 10:10:12,147 DEBUG 
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Ignore 
> TaskManager pod that is already added: autoscaling-example-taskmanager-3-2
> 2024-02-26 10:10:13,799 DEBUG 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Trigger heartbeat request.
> 2024-02-26 10:10:13,799 DEBUG 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Trigger heartbeat request.
> 2024-02-26 10:10:13,799 DEBUG org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Received heartbeat request from 
> 9b7e17b75812ab60ecf028e02368d0c2.
> 2024-02-26 10:10:13,799 DEBUG 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Received heartbeat from 251c25cf794e3c9396fc02306613507b.
> 2024-02-26 10:10:14,091 DEBUG 
> org.apache.pekko.remote.transport.netty.NettyTransport       [] - Remote 
> connection to [/10.244.0.120:55647] was disconnected because of [id: 
> 0x4a61a791, /10.244.0.120:55647 :> /10.244.0.118:6123] DISCONNECTED
> 2024-02-26 10:10:14,091 DEBUG 
> org.apache.pekko.remote.transport.ProtocolStateActor         [] - Association 
> between local [tcp://flink@10.244.0.118:6123] and remote 
> [tcp://flink@10.244.0.120:55647] was disassociated because the 
> ProtocolStateActor failed: Unknown
> 2024-02-26 10:10:14,092 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - RECEIVED 
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2024-02-26 10:10:14,094 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting 
> KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. 
> Diagnostics Cluster entrypoint has been closed externally..
> 2024-02-26 10:10:14,095 INFO  org.apache.flink.runtime.blob.BlobServer        
>              [] - Stopped BLOB server at 0.0.0.0:6124
> 2024-02-26 10:10:14,095 INFO  
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting 
> down rest endpoint.
> 2024-02-26 10:10:14,315 DEBUG 
> org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.Watcher [] - 
> Watcher closed
> 2024-02-26 10:10:14,511 DEBUG org.apache.pekko.actor.CoordinatedShutdown      
>              [] - Performing task [terminate-system] in CoordinatedShutdown 
> phase [actor-system-terminate]
> 2024-02-26 10:10:14,595 INFO  
> org.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] - 
> Shutting down remote daemon.
> 2024-02-26 10:10:14,596 INFO  
> org.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote 
> daemon shut down; proceeding with flushing remote transports.{noformat}
> Then the new JM comes back it doesn't find any checkpoints as the HA metadata 
> was deleted (we couldn't see this in the logs of the shutting down JM):
> {noformat}
> 2024-02-26 10:10:30,294 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
> Recovering checkpoints from 
> KubernetesStateHandleStore{configMapName='autoscaling-example-5ddd0b1ba346d3bfd5ef53a63772e43c-config-map'}.2024-02-26
>  10:10:30,394 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
> Found 0 checkpoints in 
> KubernetesStateHandleStore{configMapName='autoscaling-example-5ddd0b1ba346d3bfd5ef53a63772e43c-config-map'}.{noformat}
> Even the main method is re-run and the jobgraph is regenerated (which is 
> expected given the HA metadata was removed incorrectly)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34518) Adaptive Scheduler restores from empty state if JM fails during restarting state

2024-02-26 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-34518:
--

 Summary: Adaptive Scheduler restores from empty state if JM fails 
during restarting state
 Key: FLINK-34518
 URL: https://issues.apache.org/jira/browse/FLINK-34518
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / Coordination
Affects Versions: 1.18.1
Reporter: Gyula Fora


If a JobManager failover occurs while the Job is in a Restarting state, the HA 
metadata is deleted (as if it was a globally terminal state) and the job 
restarts from an empty state after the JM comes back up:

Jobmanager killed after killing Taskmanager (restarting phase):
{noformat}
2024-02-26 10:10:12,147 DEBUG 
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Ignore 
TaskManager pod that is already added: autoscaling-example-taskmanager-3-2
2024-02-26 10:10:13,799 DEBUG 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Trigger heartbeat request.
2024-02-26 10:10:13,799 DEBUG 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Trigger heartbeat request.
2024-02-26 10:10:13,799 DEBUG org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Received heartbeat request from 
9b7e17b75812ab60ecf028e02368d0c2.
2024-02-26 10:10:13,799 DEBUG 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Received heartbeat from 251c25cf794e3c9396fc02306613507b.
2024-02-26 10:10:14,091 DEBUG 
org.apache.pekko.remote.transport.netty.NettyTransport       [] - Remote 
connection to [/10.244.0.120:55647] was disconnected because of [id: 
0x4a61a791, /10.244.0.120:55647 :> /10.244.0.118:6123] DISCONNECTED
2024-02-26 10:10:14,091 DEBUG 
org.apache.pekko.remote.transport.ProtocolStateActor         [] - Association 
between local [tcp://flink@10.244.0.118:6123] and remote 
[tcp://flink@10.244.0.120:55647] was disassociated because the 
ProtocolStateActor failed: Unknown
2024-02-26 10:10:14,092 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - RECEIVED 
SIGNAL 15: SIGTERM. Shutting down as requested.
2024-02-26 10:10:14,094 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting 
KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. 
Diagnostics Cluster entrypoint has been closed externally..
2024-02-26 10:10:14,095 INFO  org.apache.flink.runtime.blob.BlobServer          
           [] - Stopped BLOB server at 0.0.0.0:6124
2024-02-26 10:10:14,095 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting 
down rest endpoint.
2024-02-26 10:10:14,315 DEBUG 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.Watcher [] - 
Watcher closed
2024-02-26 10:10:14,511 DEBUG org.apache.pekko.actor.CoordinatedShutdown        
           [] - Performing task [terminate-system] in CoordinatedShutdown phase 
[actor-system-terminate]
2024-02-26 10:10:14,595 INFO  
org.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting 
down remote daemon.
2024-02-26 10:10:14,596 INFO  
org.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote 
daemon shut down; proceeding with flushing remote transports.{noformat}
Then the new JM comes back it doesn't find any checkpoints as the HA metadata 
was deleted (we couldn't see this in the logs of the shutting down JM):


{noformat}
2024-02-26 10:10:30,294 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
Recovering checkpoints from 
KubernetesStateHandleStore{configMapName='autoscaling-example-5ddd0b1ba346d3bfd5ef53a63772e43c-config-map'}.2024-02-26
 10:10:30,394 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
Found 0 checkpoints in 
KubernetesStateHandleStore{configMapName='autoscaling-example-5ddd0b1ba346d3bfd5ef53a63772e43c-config-map'}.{noformat}
Even the main method is re-run and the jobgraph is regenerated (which is 
expected given the HA metadata was removed incorrectly)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34451) [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading fallback approach

2024-02-26 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820622#comment-17820622
 ] 

Gyula Fora commented on FLINK-34451:


It looks like there is a race condition between handling the TaskManager 
failure and the JobManager shutdown in the adaptive scheduler which leads to a 
terminal failed state. 

If I only terminate the JobManager (scale down the k8s Deployment replicas to 
0) then basically nothing happens to the HA metadata. (which is expected):


{noformat}
2024-02-26 07:59:07,413 DEBUG 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.informers.impl.cache.Reflector
 [] - Event received MODIFIED ConfigMap resourceVersion v6719805 for 
v1/namespaces/default/configmaps
2024-02-26 07:59:08,096 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - RECEIVED 
SIGNAL 15: SIGTERM. Shutting down as requested.
2024-02-26 07:59:08,097 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting 
KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. 
Diagnostics Cluster entrypoint has been closed externally..
2024-02-26 07:59:08,098 INFO  org.apache.flink.runtime.blob.BlobServer          
           [] - Stopped BLOB server at 0.0.0.0:6124
2024-02-26 07:59:08,308 DEBUG 
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.Watcher [] - 
Watcher closed
2024-02-26 07:59:08,508 WARN  org.apache.pekko.actor.CoordinatedShutdown        
           [] - Could not addJvmShutdownHook, due to: Shutdown in 
progress{noformat}

> [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading 
> fallback approach
> --
>
> Key: FLINK-34451
> URL: https://issues.apache.org/jira/browse/FLINK-34451
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: Operator version: 1.7.1
> Flink version 1.18.0
> HA JobManagers
> Adaptive scheduler mode using the operator's autoscaler
> Checkpointing at an interval of 60s
> Upgrade mode savepoint
>Reporter: Alex Hoffer
>Priority: Major
>
>  
> We had a situation where TaskManagers were constantly restarting from OOM. 
> We're using the Adaptive scheduler with the Kubernetes Operator, and a 
> restart strategy of exponential backoff, and so the JobManagers remained 
> alive. We're also using savepoint upgrade mode. 
> When we tried to remedy the situation by raising the direct memory allocation 
> to the pods, we were surprised that Flink used the last savepoint taken, 
> rather than the checkpoint. This was unfortunate for us because we are on 
> adaptive scheduler and the job hasn't changed in some time, so this last 
> savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up 
> until failure. I can confirm the HA metadata existed in the configmaps, and 
> the corresponding checkpoints existed in remote storage for it to access. 
> Plus, no Flink version changes were in the deployment.
> The Operator logs reported that it was using last-state recovery in this 
> situation:
> {code:java}
> 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
> FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : 
> null -> 100]), starting reconciliation.
> 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] 
> Upgrading/Restarting running job, suspending first...
> 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] 
> Job is not running but HA metadata is available for last state restore, ready 
> for upgrade
> 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUSPENDED   | Suspending existing deployment.
> 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] 
> Deleting JobManager deployment while preserving HA metadata. 
> 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Status | Info| UPGRADING   | The resource is being upgraded 
> 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUBMIT  | Starting deployment
> 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] 
> Deploying application cluster requiring last-state from HA metadata
> 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils [INFO ][job-name] Job 
> graph in ConfigMap job-name-cluster-config-map is deleted {code}
> But when the job booted up, it reported restoring from savepoint:
> {code:java}
> 2024-02-15 19:39:03,887 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - 

[jira] [Commented] (FLINK-34451) [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading fallback approach

2024-02-25 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820604#comment-17820604
 ] 

Gyula Fora commented on FLINK-34451:


I took a closer look at this and it also happens with the default restart 
strategy. 

The relevant log segment during shutdown (if we simply delete the deployment 
object as the last-state suspend does in the operator)
{code:java}
2024-02-26 07:05:04,412 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map -> Sink: 
Unnamed (1/1) 
(6a8685c5160d149204dd115f396dcb38_90bea66de1c231edf33913ecd54406c1_0_1) 
switched from RUNNING to FAILED on autoscaling-example-taskmanager-1-1 @ 
10.244.0.54 (dataPort=35905).
org.apache.flink.util.FlinkExpectedException: The TaskExecutor is shutting down.
    at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:481)
 ~[flink-dist-1.18.1.jar:1.18.1]
    at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:239)
 ~[flink-dist-1.18.1.jar:1.18.1]
    ...
2024-02-26 07:05:04,415 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting 
down rest endpoint.
2024-02-26 07:05:04,415 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding 
the results produced by task execution 
6a8685c5160d149204dd115f396dcb38_90bea66de1c231edf33913ecd54406c1_0_1.
2024-02-26 07:05:04,416 DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sending out 
cancel request, to remove task execution from TaskManager.
2024-02-26 07:05:04,417 WARN  
org.apache.flink.runtime.taskmanager.TaskManagerLocation     [] - No hostname 
could be resolved for the IP address 10.244.0.54, using IP address as host 
name. Local input split assignment (such as for HDFS files) may be impacted.
2024-02-26 07:05:04,417 INFO  
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Restarting 
job.
org.apache.flink.util.FlinkExpectedException: The TaskExecutor is shutting down.
    at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:481)
 ~[flink-dist-1.18.1.jar:1.18.1]
    at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:239)
 ~[flink-dist-1.18.1.jar:1.18.1]
    ...
2024-02-26 07:05:04,417 DEBUG 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Transition 
from state Executing to Restarting.
2024-02-26 07:05:04,417 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job 
Autoscaling Example (5ddd0b1ba346d3bfd5ef53a63772e43c) switched from state 
RUNNING to CANCELLING.
2024-02-26 07:05:04,417 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
Sequence Source -> Filter (1/1) 
(6a8685c5160d149204dd115f396dcb38_cbc357ccb763df2852fee8c4fc7d55f2_0_1) 
switched from RUNNING to CANCELING.
2024-02-26 07:05:04,418 DEBUG 
org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Returning 
logical slot to shared slot (SlotRequestId{cda2d4d04521eed8b88245bb0eb497e0})
2024-02-26 07:05:04,418 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
Sequence Source -> Filter (1/1) 
(6a8685c5160d149204dd115f396dcb38_cbc357ccb763df2852fee8c4fc7d55f2_0_1) 
switched from CANCELING to CANCELED.
2024-02-26 07:05:04,418 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding 
the results produced by task execution 
6a8685c5160d149204dd115f396dcb38_cbc357ccb763df2852fee8c4fc7d55f2_0_1.
2024-02-26 07:05:04,419 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding 
the results produced by task execution 
6a8685c5160d149204dd115f396dcb38_cbc357ccb763df2852fee8c4fc7d55f2_0_1.
2024-02-26 07:05:04,419 DEBUG 
org.apache.flink.runtime.scheduler.adaptive.allocator.SharedSlot [] - Returning 
logical slot to shared slot (SlotRequestId{cda2d4d04521eed8b88245bb0eb497e0})
2024-02-26 07:05:04,419 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job 
Autoscaling Example (5ddd0b1ba346d3bfd5ef53a63772e43c) switched from state 
CANCELLING to CANCELED.
2024-02-26 07:05:04,420 DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - 
ExecutionGraph 5ddd0b1ba346d3bfd5ef53a63772e43c reached terminal state CANCELED.
2024-02-26 07:05:04,420 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Stopping 
checkpoint coordinator for job 5ddd0b1ba346d3bfd5ef53a63772e43c.
2024-02-26 07:05:04,420 INFO  
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
Releasing slot [d3be5ab8662c10de36088fddeb531b59].
org.apache.flink.util.FlinkExpectedException: The TaskExecutor is shutting down.
    at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:481)
 ~[flink-dist-1.18.1.jar:1.18.1]
    at 

[jira] [Commented] (FLINK-34451) [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading fallback approach

2024-02-23 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820276#comment-17820276
 ] 

Gyula Fora commented on FLINK-34451:


I will definitely try this on Monday , I was just curious if you have any 
working hypothesis to validate first :) thanks for narrowing this down

> [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading 
> fallback approach
> --
>
> Key: FLINK-34451
> URL: https://issues.apache.org/jira/browse/FLINK-34451
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: Operator version: 1.7.1
> Flink version 1.18.0
> HA JobManagers
> Adaptive scheduler mode using the operator's autoscaler
> Checkpointing at an interval of 60s
> Upgrade mode savepoint
>Reporter: Alex Hoffer
>Priority: Major
>
>  
> We had a situation where TaskManagers were constantly restarting from OOM. 
> We're using the Adaptive scheduler with the Kubernetes Operator, and a 
> restart strategy of exponential backoff, and so the JobManagers remained 
> alive. We're also using savepoint upgrade mode. 
> When we tried to remedy the situation by raising the direct memory allocation 
> to the pods, we were surprised that Flink used the last savepoint taken, 
> rather than the checkpoint. This was unfortunate for us because we are on 
> adaptive scheduler and the job hasn't changed in some time, so this last 
> savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up 
> until failure. I can confirm the HA metadata existed in the configmaps, and 
> the corresponding checkpoints existed in remote storage for it to access. 
> Plus, no Flink version changes were in the deployment.
> The Operator logs reported that it was using last-state recovery in this 
> situation:
> {code:java}
> 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
> FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : 
> null -> 100]), starting reconciliation.
> 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] 
> Upgrading/Restarting running job, suspending first...
> 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] 
> Job is not running but HA metadata is available for last state restore, ready 
> for upgrade
> 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUSPENDED   | Suspending existing deployment.
> 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] 
> Deleting JobManager deployment while preserving HA metadata. 
> 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Status | Info| UPGRADING   | The resource is being upgraded 
> 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUBMIT  | Starting deployment
> 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] 
> Deploying application cluster requiring last-state from HA metadata
> 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils [INFO ][job-name] Job 
> graph in ConfigMap job-name-cluster-config-map is deleted {code}
> But when the job booted up, it reported restoring from savepoint:
> {code:java}
> 2024-02-15 19:39:03,887 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 522b3c363499d81ed7922aa30b13e237 from Savepoint 20207 @ 0 for 
> 522b3c363499d81ed7922aa30b13e237 located at 
> abfss://savepoi...@storageaccount.dfs.core.windows.net/job-name/savepoint-522b3c-8836a1edc709.
>  {code}
> Our expectation was that the Operator logs were true, and that it would be 
> restoring from checkpoint. We had to scramble and manually restore from the 
> checkpoint to restore function.
>  
>  
> It's also worth noting I can recreate this issue in a testing environment. 
> The process for doing so is:
> - Boot up HA JobManagers with checkpoints on and savepoint upgrade mode, 
> using adaptive scheduler
> - Make a dummy change to trigger a savepoint.
> - Allow the TaskManagers to process some data and hit the checkpoint interval.
> - Cause the TaskManagers to crash. In our case, we could use up a bunch of 
> memory in the pods and cause it to crash.
> - Observe the Operator logs saying it is restoring from last-state, but watch 
> as the pods instead use the last savepoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34451) [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading fallback approach

2024-02-23 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820272#comment-17820272
 ] 

Gyula Fora commented on FLINK-34451:


To me the logs are not very surprising. The way it is currently implemented the 
operator sends the last savepoint as the savepoint parameter even if it assumes 
that last state will be used.

It’s a bit weird and could be removed …

but in any case it expects Flink to pick up the last state through the failover 
mechanism from the HA metadata (that’s how last state upgrade works)

so to me the logs are not so informative, the big questions is why the adaptive 
scheduler would ignore the HA metadata in this particular case. The operator 
checked that it’s there but Flink ignored it and instead starts from the 
savepoint (which was set somewhat unnecessarily but expected)

> [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading 
> fallback approach
> --
>
> Key: FLINK-34451
> URL: https://issues.apache.org/jira/browse/FLINK-34451
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: Operator version: 1.7.1
> Flink version 1.18.0
> HA JobManagers
> Adaptive scheduler mode using the operator's autoscaler
> Checkpointing at an interval of 60s
> Upgrade mode savepoint
>Reporter: Alex Hoffer
>Priority: Major
>
>  
> We had a situation where TaskManagers were constantly restarting from OOM. 
> We're using the Adaptive scheduler with the Kubernetes Operator, and a 
> restart strategy of exponential backoff, and so the JobManagers remained 
> alive. We're also using savepoint upgrade mode. 
> When we tried to remedy the situation by raising the direct memory allocation 
> to the pods, we were surprised that Flink used the last savepoint taken, 
> rather than the checkpoint. This was unfortunate for us because we are on 
> adaptive scheduler and the job hasn't changed in some time, so this last 
> savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up 
> until failure. I can confirm the HA metadata existed in the configmaps, and 
> the corresponding checkpoints existed in remote storage for it to access. 
> Plus, no Flink version changes were in the deployment.
> The Operator logs reported that it was using last-state recovery in this 
> situation:
> {code:java}
> 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
> FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : 
> null -> 100]), starting reconciliation.
> 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] 
> Upgrading/Restarting running job, suspending first...
> 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] 
> Job is not running but HA metadata is available for last state restore, ready 
> for upgrade
> 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUSPENDED   | Suspending existing deployment.
> 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] 
> Deleting JobManager deployment while preserving HA metadata. 
> 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Status | Info| UPGRADING   | The resource is being upgraded 
> 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUBMIT  | Starting deployment
> 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] 
> Deploying application cluster requiring last-state from HA metadata
> 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils [INFO ][job-name] Job 
> graph in ConfigMap job-name-cluster-config-map is deleted {code}
> But when the job booted up, it reported restoring from savepoint:
> {code:java}
> 2024-02-15 19:39:03,887 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 522b3c363499d81ed7922aa30b13e237 from Savepoint 20207 @ 0 for 
> 522b3c363499d81ed7922aa30b13e237 located at 
> abfss://savepoi...@storageaccount.dfs.core.windows.net/job-name/savepoint-522b3c-8836a1edc709.
>  {code}
> Our expectation was that the Operator logs were true, and that it would be 
> restoring from checkpoint. We had to scramble and manually restore from the 
> checkpoint to restore function.
>  
>  
> It's also worth noting I can recreate this issue in a testing environment. 
> The process for doing so is:
> - Boot up HA JobManagers with checkpoints on and savepoint upgrade mode, 
> using adaptive scheduler
> - Make a dummy change to trigger a savepoint.
> - Allow the TaskManagers 

[jira] [Commented] (FLINK-34451) [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading fallback approach

2024-02-23 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820209#comment-17820209
 ] 

Gyula Fora commented on FLINK-34451:


That's a good catch, if this is a bug related to adaptive scheduler + 
exponential restart strategy then we should be able to track it down. Do you 
have any hypothesis what may cause the issue?

> [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading 
> fallback approach
> --
>
> Key: FLINK-34451
> URL: https://issues.apache.org/jira/browse/FLINK-34451
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: Operator version: 1.7.1
> Flink version 1.18.0
> HA JobManagers
> Adaptive scheduler mode using the operator's autoscaler
> Checkpointing at an interval of 60s
> Upgrade mode savepoint
>Reporter: Alex Hoffer
>Priority: Major
>
>  
> We had a situation where TaskManagers were constantly restarting from OOM. 
> We're using the Adaptive scheduler with the Kubernetes Operator, and a 
> restart strategy of exponential backoff, and so the JobManagers remained 
> alive. We're also using savepoint upgrade mode. 
> When we tried to remedy the situation by raising the direct memory allocation 
> to the pods, we were surprised that Flink used the last savepoint taken, 
> rather than the checkpoint. This was unfortunate for us because we are on 
> adaptive scheduler and the job hasn't changed in some time, so this last 
> savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up 
> until failure. I can confirm the HA metadata existed in the configmaps, and 
> the corresponding checkpoints existed in remote storage for it to access. 
> Plus, no Flink version changes were in the deployment.
> The Operator logs reported that it was using last-state recovery in this 
> situation:
> {code:java}
> 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
> FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : 
> null -> 100]), starting reconciliation.
> 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] 
> Upgrading/Restarting running job, suspending first...
> 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] 
> Job is not running but HA metadata is available for last state restore, ready 
> for upgrade
> 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUSPENDED   | Suspending existing deployment.
> 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] 
> Deleting JobManager deployment while preserving HA metadata. 
> 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Status | Info| UPGRADING   | The resource is being upgraded 
> 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUBMIT  | Starting deployment
> 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] 
> Deploying application cluster requiring last-state from HA metadata
> 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils [INFO ][job-name] Job 
> graph in ConfigMap job-name-cluster-config-map is deleted {code}
> But when the job booted up, it reported restoring from savepoint:
> {code:java}
> 2024-02-15 19:39:03,887 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 522b3c363499d81ed7922aa30b13e237 from Savepoint 20207 @ 0 for 
> 522b3c363499d81ed7922aa30b13e237 located at 
> abfss://savepoi...@storageaccount.dfs.core.windows.net/job-name/savepoint-522b3c-8836a1edc709.
>  {code}
> Our expectation was that the Operator logs were true, and that it would be 
> restoring from checkpoint. We had to scramble and manually restore from the 
> checkpoint to restore function.
>  
>  
> It's also worth noting I can recreate this issue in a testing environment. 
> The process for doing so is:
> - Boot up HA JobManagers with checkpoints on and savepoint upgrade mode, 
> using adaptive scheduler
> - Make a dummy change to trigger a savepoint.
> - Allow the TaskManagers to process some data and hit the checkpoint interval.
> - Cause the TaskManagers to crash. In our case, we could use up a bunch of 
> memory in the pods and cause it to crash.
> - Observe the Operator logs saying it is restoring from last-state, but watch 
> as the pods instead use the last savepoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29696) [Doc] Operator helm install command points to wrong repo

2024-02-22 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819941#comment-17819941
 ] 

Gyula Fora commented on FLINK-29696:


[~domenicbove] feel free to open a doc improvement PR if you think it's better 
that way :) 

> [Doc] Operator helm install command points to wrong repo
> 
>
> Key: FLINK-29696
> URL: https://issues.apache.org/jira/browse/FLINK-29696
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Kubernetes Operator
>Reporter: Yufei Zhang
>Priority: Minor
>
> In the operator documentation, the repo is added via:
> `helm repo add flink-operator-repo 
> https://downloads.apache.org/flink/flink-kubernetes-operator-/`
>  
> But later in the Operation-> Helm, the code instruct us to use 
>  
> `{{{}helm install flink-kubernetes-operator 
> helm/flink-kubernetes-operator`{}}}
> {{}}
> Here we won't be able to download the helm chart since we are not using the 
> right repo.
>  
> You can assign this Jira to me and I can submit a PR to fix it~ 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34451) [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading fallback approach

2024-02-22 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819826#comment-17819826
 ] 

Gyula Fora commented on FLINK-34451:


Also one thing that occurred to me is that the issue could be with your HA 
storage. Is it possible that the new JM cannot access the HA storage directory? 
Can you change it to S3/Hdfs maybe?

 

during the upgrade the JM pod is killed and a new one will start so it needs 
access to the stored files . 

> [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading 
> fallback approach
> --
>
> Key: FLINK-34451
> URL: https://issues.apache.org/jira/browse/FLINK-34451
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: Operator version: 1.6.1
> Flink version 1.18.0
> HA JobManagers
> Adaptive scheduler mode using the operator's autoscaler
> Checkpointing at an interval of 60s
> Upgrade mode savepoint
>Reporter: Alex Hoffer
>Priority: Major
>
>  
> We had a situation where TaskManagers were constantly restarting from OOM. 
> We're using the Adaptive scheduler with the Kubernetes Operator, and a 
> restart strategy of exponential backoff, and so the JobManagers remained 
> alive. We're also using savepoint upgrade mode. 
> When we tried to remedy the situation by raising the direct memory allocation 
> to the pods, we were surprised that Flink used the last savepoint taken, 
> rather than the checkpoint. This was unfortunate for us because we are on 
> adaptive scheduler and the job hasn't changed in some time, so this last 
> savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up 
> until failure. I can confirm the HA metadata existed in the configmaps, and 
> the corresponding checkpoints existed in remote storage for it to access. 
> Plus, no Flink version changes were in the deployment.
> The Operator logs reported that it was using last-state recovery in this 
> situation:
> {code:java}
> 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
> FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : 
> null -> 100]), starting reconciliation.
> 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] 
> Upgrading/Restarting running job, suspending first...
> 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] 
> Job is not running but HA metadata is available for last state restore, ready 
> for upgrade
> 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUSPENDED   | Suspending existing deployment.
> 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] 
> Deleting JobManager deployment while preserving HA metadata. 
> 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Status | Info| UPGRADING   | The resource is being upgraded 
> 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUBMIT  | Starting deployment
> 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] 
> Deploying application cluster requiring last-state from HA metadata
> 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils [INFO ][job-name] Job 
> graph in ConfigMap job-name-cluster-config-map is deleted {code}
> But when the job booted up, it reported restoring from savepoint:
> {code:java}
> 2024-02-15 19:39:03,887 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 522b3c363499d81ed7922aa30b13e237 from Savepoint 20207 @ 0 for 
> 522b3c363499d81ed7922aa30b13e237 located at 
> abfss://savepoi...@storageaccount.dfs.core.windows.net/job-name/savepoint-522b3c-8836a1edc709.
>  {code}
> Our expectation was that the Operator logs were true, and that it would be 
> restoring from checkpoint. We had to scramble and manually restore from the 
> checkpoint to restore function.
>  
>  
> It's also worth noting I can recreate this issue in a testing environment. 
> The process for doing so is:
> - Boot up HA JobManagers with checkpoints on and savepoint upgrade mode, 
> using adaptive scheduler
> - Make a dummy change to trigger a savepoint.
> - Allow the TaskManagers to process some data and hit the checkpoint interval.
> - Cause the TaskManagers to crash. In our case, we could use up a bunch of 
> memory in the pods and cause it to crash.
> - Observe the Operator logs saying it is restoring from last-state, but watch 
> as the pods instead use the last savepoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34451) [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading fallback approach

2024-02-22 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819810#comment-17819810
 ] 

Gyula Fora commented on FLINK-34451:


I tried killing TMs and immediately bumping the restartNonce at the same time. 
You can see in the logs that the operator uses the HA metadata and picks up the 
latest checkpoint correctly (not the savepoint)

> [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading 
> fallback approach
> --
>
> Key: FLINK-34451
> URL: https://issues.apache.org/jira/browse/FLINK-34451
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: Operator version: 1.6.1
> Flink version 1.18.0
> HA JobManagers
> Adaptive scheduler mode using the operator's autoscaler
> Checkpointing at an interval of 60s
> Upgrade mode savepoint
>Reporter: Alex Hoffer
>Priority: Major
>
>  
> We had a situation where TaskManagers were constantly restarting from OOM. 
> We're using the Adaptive scheduler with the Kubernetes Operator, and a 
> restart strategy of exponential backoff, and so the JobManagers remained 
> alive. We're also using savepoint upgrade mode. 
> When we tried to remedy the situation by raising the direct memory allocation 
> to the pods, we were surprised that Flink used the last savepoint taken, 
> rather than the checkpoint. This was unfortunate for us because we are on 
> adaptive scheduler and the job hasn't changed in some time, so this last 
> savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up 
> until failure. I can confirm the HA metadata existed in the configmaps, and 
> the corresponding checkpoints existed in remote storage for it to access. 
> Plus, no Flink version changes were in the deployment.
> The Operator logs reported that it was using last-state recovery in this 
> situation:
> {code:java}
> 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
> FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : 
> null -> 100]), starting reconciliation.
> 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] 
> Upgrading/Restarting running job, suspending first...
> 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] 
> Job is not running but HA metadata is available for last state restore, ready 
> for upgrade
> 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUSPENDED   | Suspending existing deployment.
> 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] 
> Deleting JobManager deployment while preserving HA metadata. 
> 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Status | Info| UPGRADING   | The resource is being upgraded 
> 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUBMIT  | Starting deployment
> 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] 
> Deploying application cluster requiring last-state from HA metadata
> 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils [INFO ][job-name] Job 
> graph in ConfigMap job-name-cluster-config-map is deleted {code}
> But when the job booted up, it reported restoring from savepoint:
> {code:java}
> 2024-02-15 19:39:03,887 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 522b3c363499d81ed7922aa30b13e237 from Savepoint 20207 @ 0 for 
> 522b3c363499d81ed7922aa30b13e237 located at 
> abfss://savepoi...@storageaccount.dfs.core.windows.net/job-name/savepoint-522b3c-8836a1edc709.
>  {code}
> Our expectation was that the Operator logs were true, and that it would be 
> restoring from checkpoint. We had to scramble and manually restore from the 
> checkpoint to restore function.
>  
>  
> It's also worth noting I can recreate this issue in a testing environment. 
> The process for doing so is:
> - Boot up HA JobManagers with checkpoints on and savepoint upgrade mode, 
> using adaptive scheduler
> - Make a dummy change to trigger a savepoint.
> - Allow the TaskManagers to process some data and hit the checkpoint interval.
> - Cause the TaskManagers to crash. In our case, we could use up a bunch of 
> memory in the pods and cause it to crash.
> - Observe the Operator logs saying it is restoring from last-state, but watch 
> as the pods instead use the last savepoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34451) [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading fallback approach

2024-02-22 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819807#comment-17819807
 ] 

Gyula Fora commented on FLINK-34451:


Hm, so this really seems to be somehow adaptive scheduler related. My problem 
is that I cannot repro this locally. Would you please try with the 
`basic-checkpoint-ha.yaml` example? 

That has checkpointing and HA enabled, you can configure the adaptive scheduler 
and 1.18. I could not repro it this way.

> [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading 
> fallback approach
> --
>
> Key: FLINK-34451
> URL: https://issues.apache.org/jira/browse/FLINK-34451
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: Operator version: 1.6.1
> Flink version 1.18.0
> HA JobManagers
> Adaptive scheduler mode using the operator's autoscaler
> Checkpointing at an interval of 60s
> Upgrade mode savepoint
>Reporter: Alex Hoffer
>Priority: Major
>
>  
> We had a situation where TaskManagers were constantly restarting from OOM. 
> We're using the Adaptive scheduler with the Kubernetes Operator, and a 
> restart strategy of exponential backoff, and so the JobManagers remained 
> alive. We're also using savepoint upgrade mode. 
> When we tried to remedy the situation by raising the direct memory allocation 
> to the pods, we were surprised that Flink used the last savepoint taken, 
> rather than the checkpoint. This was unfortunate for us because we are on 
> adaptive scheduler and the job hasn't changed in some time, so this last 
> savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up 
> until failure. I can confirm the HA metadata existed in the configmaps, and 
> the corresponding checkpoints existed in remote storage for it to access. 
> Plus, no Flink version changes were in the deployment.
> The Operator logs reported that it was using last-state recovery in this 
> situation:
> {code:java}
> 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
> FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : 
> null -> 100]), starting reconciliation.
> 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] 
> Upgrading/Restarting running job, suspending first...
> 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] 
> Job is not running but HA metadata is available for last state restore, ready 
> for upgrade
> 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUSPENDED   | Suspending existing deployment.
> 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] 
> Deleting JobManager deployment while preserving HA metadata. 
> 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Status | Info| UPGRADING   | The resource is being upgraded 
> 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUBMIT  | Starting deployment
> 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] 
> Deploying application cluster requiring last-state from HA metadata
> 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils [INFO ][job-name] Job 
> graph in ConfigMap job-name-cluster-config-map is deleted {code}
> But when the job booted up, it reported restoring from savepoint:
> {code:java}
> 2024-02-15 19:39:03,887 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 522b3c363499d81ed7922aa30b13e237 from Savepoint 20207 @ 0 for 
> 522b3c363499d81ed7922aa30b13e237 located at 
> abfss://savepoi...@storageaccount.dfs.core.windows.net/job-name/savepoint-522b3c-8836a1edc709.
>  {code}
> Our expectation was that the Operator logs were true, and that it would be 
> restoring from checkpoint. We had to scramble and manually restore from the 
> checkpoint to restore function.
>  
>  
> It's also worth noting I can recreate this issue in a testing environment. 
> The process for doing so is:
> - Boot up HA JobManagers with checkpoints on and savepoint upgrade mode, 
> using adaptive scheduler
> - Make a dummy change to trigger a savepoint.
> - Allow the TaskManagers to process some data and hit the checkpoint interval.
> - Cause the TaskManagers to crash. In our case, we could use up a bunch of 
> memory in the pods and cause it to crash.
> - Observe the Operator logs saying it is restoring from last-state, but watch 
> as the pods instead use the last savepoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34451) [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading fallback approach

2024-02-22 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819795#comment-17819795
 ] 

Gyula Fora edited comment on FLINK-34451 at 2/22/24 8:12 PM:
-

I did not mean to turn off HA but only to reduce the JM replicas to 1


was (Author: gyfora):
I did not mean to turn off HA but only to reduce the replicas to 1

> [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading 
> fallback approach
> --
>
> Key: FLINK-34451
> URL: https://issues.apache.org/jira/browse/FLINK-34451
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: Operator version: 1.6.1
> Flink version 1.18.0
> HA JobManagers
> Adaptive scheduler mode using the operator's autoscaler
> Checkpointing at an interval of 60s
> Upgrade mode savepoint
>Reporter: Alex Hoffer
>Priority: Major
>
>  
> We had a situation where TaskManagers were constantly restarting from OOM. 
> We're using the Adaptive scheduler with the Kubernetes Operator, and a 
> restart strategy of exponential backoff, and so the JobManagers remained 
> alive. We're also using savepoint upgrade mode. 
> When we tried to remedy the situation by raising the direct memory allocation 
> to the pods, we were surprised that Flink used the last savepoint taken, 
> rather than the checkpoint. This was unfortunate for us because we are on 
> adaptive scheduler and the job hasn't changed in some time, so this last 
> savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up 
> until failure. I can confirm the HA metadata existed in the configmaps, and 
> the corresponding checkpoints existed in remote storage for it to access. 
> Plus, no Flink version changes were in the deployment.
> The Operator logs reported that it was using last-state recovery in this 
> situation:
> {code:java}
> 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
> FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : 
> null -> 100]), starting reconciliation.
> 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] 
> Upgrading/Restarting running job, suspending first...
> 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] 
> Job is not running but HA metadata is available for last state restore, ready 
> for upgrade
> 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUSPENDED   | Suspending existing deployment.
> 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] 
> Deleting JobManager deployment while preserving HA metadata. 
> 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Status | Info| UPGRADING   | The resource is being upgraded 
> 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUBMIT  | Starting deployment
> 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] 
> Deploying application cluster requiring last-state from HA metadata
> 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils [INFO ][job-name] Job 
> graph in ConfigMap job-name-cluster-config-map is deleted {code}
> But when the job booted up, it reported restoring from savepoint:
> {code:java}
> 2024-02-15 19:39:03,887 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 522b3c363499d81ed7922aa30b13e237 from Savepoint 20207 @ 0 for 
> 522b3c363499d81ed7922aa30b13e237 located at 
> abfss://savepoi...@storageaccount.dfs.core.windows.net/job-name/savepoint-522b3c-8836a1edc709.
>  {code}
> Our expectation was that the Operator logs were true, and that it would be 
> restoring from checkpoint. We had to scramble and manually restore from the 
> checkpoint to restore function.
>  
>  
> It's also worth noting I can recreate this issue in a testing environment. 
> The process for doing so is:
> - Boot up HA JobManagers with checkpoints on and savepoint upgrade mode, 
> using adaptive scheduler
> - Make a dummy change to trigger a savepoint.
> - Allow the TaskManagers to process some data and hit the checkpoint interval.
> - Cause the TaskManagers to crash. In our case, we could use up a bunch of 
> memory in the pods and cause it to crash.
> - Observe the Operator logs saying it is restoring from last-state, but watch 
> as the pods instead use the last savepoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34451) [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading fallback approach

2024-02-22 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819795#comment-17819795
 ] 

Gyula Fora commented on FLINK-34451:


I did not mean to turn off HA but only to reduce the replicas to 1

> [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading 
> fallback approach
> --
>
> Key: FLINK-34451
> URL: https://issues.apache.org/jira/browse/FLINK-34451
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: Operator version: 1.6.1
> Flink version 1.18.0
> HA JobManagers
> Adaptive scheduler mode using the operator's autoscaler
> Checkpointing at an interval of 60s
> Upgrade mode savepoint
>Reporter: Alex Hoffer
>Priority: Major
>
>  
> We had a situation where TaskManagers were constantly restarting from OOM. 
> We're using the Adaptive scheduler with the Kubernetes Operator, and a 
> restart strategy of exponential backoff, and so the JobManagers remained 
> alive. We're also using savepoint upgrade mode. 
> When we tried to remedy the situation by raising the direct memory allocation 
> to the pods, we were surprised that Flink used the last savepoint taken, 
> rather than the checkpoint. This was unfortunate for us because we are on 
> adaptive scheduler and the job hasn't changed in some time, so this last 
> savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up 
> until failure. I can confirm the HA metadata existed in the configmaps, and 
> the corresponding checkpoints existed in remote storage for it to access. 
> Plus, no Flink version changes were in the deployment.
> The Operator logs reported that it was using last-state recovery in this 
> situation:
> {code:java}
> 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
> FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : 
> null -> 100]), starting reconciliation.
> 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] 
> Upgrading/Restarting running job, suspending first...
> 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] 
> Job is not running but HA metadata is available for last state restore, ready 
> for upgrade
> 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUSPENDED   | Suspending existing deployment.
> 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] 
> Deleting JobManager deployment while preserving HA metadata. 
> 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Status | Info| UPGRADING   | The resource is being upgraded 
> 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUBMIT  | Starting deployment
> 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] 
> Deploying application cluster requiring last-state from HA metadata
> 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils [INFO ][job-name] Job 
> graph in ConfigMap job-name-cluster-config-map is deleted {code}
> But when the job booted up, it reported restoring from savepoint:
> {code:java}
> 2024-02-15 19:39:03,887 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 522b3c363499d81ed7922aa30b13e237 from Savepoint 20207 @ 0 for 
> 522b3c363499d81ed7922aa30b13e237 located at 
> abfss://savepoi...@storageaccount.dfs.core.windows.net/job-name/savepoint-522b3c-8836a1edc709.
>  {code}
> Our expectation was that the Operator logs were true, and that it would be 
> restoring from checkpoint. We had to scramble and manually restore from the 
> checkpoint to restore function.
>  
>  
> It's also worth noting I can recreate this issue in a testing environment. 
> The process for doing so is:
> - Boot up HA JobManagers with checkpoints on and savepoint upgrade mode, 
> using adaptive scheduler
> - Make a dummy change to trigger a savepoint.
> - Allow the TaskManagers to process some data and hit the checkpoint interval.
> - Cause the TaskManagers to crash. In our case, we could use up a bunch of 
> memory in the pods and cause it to crash.
> - Observe the Operator logs saying it is restoring from last-state, but watch 
> as the pods instead use the last savepoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34451) [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading fallback approach

2024-02-22 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819783#comment-17819783
 ] 

Gyula Fora commented on FLINK-34451:


Could this be related to the the Jobmanager HA? Instead of 2 replicas change to 
1 and try again?

> [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading 
> fallback approach
> --
>
> Key: FLINK-34451
> URL: https://issues.apache.org/jira/browse/FLINK-34451
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: Operator version: 1.6.1
> Flink version 1.18.0
> HA JobManagers
> Adaptive scheduler mode using the operator's autoscaler
> Checkpointing at an interval of 60s
> Upgrade mode savepoint
>Reporter: Alex Hoffer
>Priority: Major
>
>  
> We had a situation where TaskManagers were constantly restarting from OOM. 
> We're using the Adaptive scheduler with the Kubernetes Operator, and a 
> restart strategy of exponential backoff, and so the JobManagers remained 
> alive. We're also using savepoint upgrade mode. 
> When we tried to remedy the situation by raising the direct memory allocation 
> to the pods, we were surprised that Flink used the last savepoint taken, 
> rather than the checkpoint. This was unfortunate for us because we are on 
> adaptive scheduler and the job hasn't changed in some time, so this last 
> savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up 
> until failure. I can confirm the HA metadata existed in the configmaps, and 
> the corresponding checkpoints existed in remote storage for it to access. 
> Plus, no Flink version changes were in the deployment.
> The Operator logs reported that it was using last-state recovery in this 
> situation:
> {code:java}
> 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
> FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : 
> null -> 100]), starting reconciliation.
> 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] 
> Upgrading/Restarting running job, suspending first...
> 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] 
> Job is not running but HA metadata is available for last state restore, ready 
> for upgrade
> 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUSPENDED   | Suspending existing deployment.
> 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] 
> Deleting JobManager deployment while preserving HA metadata. 
> 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Status | Info| UPGRADING   | The resource is being upgraded 
> 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUBMIT  | Starting deployment
> 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] 
> Deploying application cluster requiring last-state from HA metadata
> 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils [INFO ][job-name] Job 
> graph in ConfigMap job-name-cluster-config-map is deleted {code}
> But when the job booted up, it reported restoring from savepoint:
> {code:java}
> 2024-02-15 19:39:03,887 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 522b3c363499d81ed7922aa30b13e237 from Savepoint 20207 @ 0 for 
> 522b3c363499d81ed7922aa30b13e237 located at 
> abfss://savepoi...@storageaccount.dfs.core.windows.net/job-name/savepoint-522b3c-8836a1edc709.
>  {code}
> Our expectation was that the Operator logs were true, and that it would be 
> restoring from checkpoint. We had to scramble and manually restore from the 
> checkpoint to restore function.
>  
>  
> It's also worth noting I can recreate this issue in a testing environment. 
> The process for doing so is:
> - Boot up HA JobManagers with checkpoints on and savepoint upgrade mode, 
> using adaptive scheduler
> - Make a dummy change to trigger a savepoint.
> - Allow the TaskManagers to process some data and hit the checkpoint interval.
> - Cause the TaskManagers to crash. In our case, we could use up a bunch of 
> memory in the pods and cause it to crash.
> - Observe the Operator logs saying it is restoring from last-state, but watch 
> as the pods instead use the last savepoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34451) [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading fallback approach

2024-02-22 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819567#comment-17819567
 ] 

Gyula Fora commented on FLINK-34451:


I am only asking because there have been fixes / improvements between 1.18.0 -> 
1.18.1

> [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading 
> fallback approach
> --
>
> Key: FLINK-34451
> URL: https://issues.apache.org/jira/browse/FLINK-34451
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: Operator version: 1.6.1
> Flink version 1.18.0
> HA JobManagers
> Adaptive scheduler mode using the operator's autoscaler
> Checkpointing at an interval of 60s
> Upgrade mode savepoint
>Reporter: Alex Hoffer
>Priority: Major
>
>  
> We had a situation where TaskManagers were constantly restarting from OOM. 
> We're using the Adaptive scheduler with the Kubernetes Operator, and a 
> restart strategy of exponential backoff, and so the JobManagers remained 
> alive. We're also using savepoint upgrade mode. 
> When we tried to remedy the situation by raising the direct memory allocation 
> to the pods, we were surprised that Flink used the last savepoint taken, 
> rather than the checkpoint. This was unfortunate for us because we are on 
> adaptive scheduler and the job hasn't changed in some time, so this last 
> savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up 
> until failure. I can confirm the HA metadata existed in the configmaps, and 
> the corresponding checkpoints existed in remote storage for it to access. 
> Plus, no Flink version changes were in the deployment.
> The Operator logs reported that it was using last-state recovery in this 
> situation:
> {code:java}
> 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
> FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : 
> null -> 100]), starting reconciliation.
> 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] 
> Upgrading/Restarting running job, suspending first...
> 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] 
> Job is not running but HA metadata is available for last state restore, ready 
> for upgrade
> 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUSPENDED   | Suspending existing deployment.
> 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] 
> Deleting JobManager deployment while preserving HA metadata. 
> 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Status | Info| UPGRADING   | The resource is being upgraded 
> 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUBMIT  | Starting deployment
> 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] 
> Deploying application cluster requiring last-state from HA metadata
> 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils [INFO ][job-name] Job 
> graph in ConfigMap job-name-cluster-config-map is deleted {code}
> But when the job booted up, it reported restoring from savepoint:
> {code:java}
> 2024-02-15 19:39:03,887 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 522b3c363499d81ed7922aa30b13e237 from Savepoint 20207 @ 0 for 
> 522b3c363499d81ed7922aa30b13e237 located at 
> abfss://savepoi...@storageaccount.dfs.core.windows.net/job-name/savepoint-522b3c-8836a1edc709.
>  {code}
> Our expectation was that the Operator logs were true, and that it would be 
> restoring from checkpoint. We had to scramble and manually restore from the 
> checkpoint to restore function.
>  
>  
> It's also worth noting I can recreate this issue in a testing environment. 
> The process for doing so is:
> - Boot up HA JobManagers with checkpoints on and savepoint upgrade mode, 
> using adaptive scheduler
> - Make a dummy change to trigger a savepoint.
> - Allow the TaskManagers to process some data and hit the checkpoint interval.
> - Cause the TaskManagers to crash. In our case, we could use up a bunch of 
> memory in the pods and cause it to crash.
> - Observe the Operator logs saying it is restoring from last-state, but watch 
> as the pods instead use the last savepoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34451) [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading fallback approach

2024-02-22 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819564#comment-17819564
 ] 

Gyula Fora commented on FLINK-34451:


Which 1.18 version are you using? I have only tried to repro this with 1.18.1 
(latest 1.18 release) but failed to repro it.

> [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading 
> fallback approach
> --
>
> Key: FLINK-34451
> URL: https://issues.apache.org/jira/browse/FLINK-34451
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: Operator version: 1.6.1
> Flink version 1.18.0
> HA JobManagers
> Adaptive scheduler mode using the operator's autoscaler
> Checkpointing at an interval of 60s
> Upgrade mode savepoint
>Reporter: Alex Hoffer
>Priority: Major
>
>  
> We had a situation where TaskManagers were constantly restarting from OOM. 
> We're using the Adaptive scheduler with the Kubernetes Operator, and a 
> restart strategy of exponential backoff, and so the JobManagers remained 
> alive. We're also using savepoint upgrade mode. 
> When we tried to remedy the situation by raising the direct memory allocation 
> to the pods, we were surprised that Flink used the last savepoint taken, 
> rather than the checkpoint. This was unfortunate for us because we are on 
> adaptive scheduler and the job hasn't changed in some time, so this last 
> savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up 
> until failure. I can confirm the HA metadata existed in the configmaps, and 
> the corresponding checkpoints existed in remote storage for it to access. 
> Plus, no Flink version changes were in the deployment.
> The Operator logs reported that it was using last-state recovery in this 
> situation:
> {code:java}
> 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
> FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : 
> null -> 100]), starting reconciliation.
> 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] 
> Upgrading/Restarting running job, suspending first...
> 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] 
> Job is not running but HA metadata is available for last state restore, ready 
> for upgrade
> 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUSPENDED   | Suspending existing deployment.
> 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] 
> Deleting JobManager deployment while preserving HA metadata. 
> 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Status | Info| UPGRADING   | The resource is being upgraded 
> 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUBMIT  | Starting deployment
> 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] 
> Deploying application cluster requiring last-state from HA metadata
> 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils [INFO ][job-name] Job 
> graph in ConfigMap job-name-cluster-config-map is deleted {code}
> But when the job booted up, it reported restoring from savepoint:
> {code:java}
> 2024-02-15 19:39:03,887 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 522b3c363499d81ed7922aa30b13e237 from Savepoint 20207 @ 0 for 
> 522b3c363499d81ed7922aa30b13e237 located at 
> abfss://savepoi...@storageaccount.dfs.core.windows.net/job-name/savepoint-522b3c-8836a1edc709.
>  {code}
> Our expectation was that the Operator logs were true, and that it would be 
> restoring from checkpoint. We had to scramble and manually restore from the 
> checkpoint to restore function.
>  
>  
> It's also worth noting I can recreate this issue in a testing environment. 
> The process for doing so is:
> - Boot up HA JobManagers with checkpoints on and savepoint upgrade mode, 
> using adaptive scheduler
> - Make a dummy change to trigger a savepoint.
> - Allow the TaskManagers to process some data and hit the checkpoint interval.
> - Cause the TaskManagers to crash. In our case, we could use up a bunch of 
> memory in the pods and cause it to crash.
> - Observe the Operator logs saying it is restoring from last-state, but watch 
> as the pods instead use the last savepoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-34451) [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading fallback approach

2024-02-22 Thread Gyula Fora (Jira)


[ https://issues.apache.org/jira/browse/FLINK-34451 ]


Gyula Fora deleted comment on FLINK-34451:


was (Author: gyfora):
[~alexdchoffer] so, just to confirm:

This issue doesn't occur with Flink 1.18? (even with the adaptive scheduler)

> [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading 
> fallback approach
> --
>
> Key: FLINK-34451
> URL: https://issues.apache.org/jira/browse/FLINK-34451
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: Operator version: 1.6.1
> Flink version 1.18.0
> HA JobManagers
> Adaptive scheduler mode using the operator's autoscaler
> Checkpointing at an interval of 60s
> Upgrade mode savepoint
>Reporter: Alex Hoffer
>Priority: Major
>
>  
> We had a situation where TaskManagers were constantly restarting from OOM. 
> We're using the Adaptive scheduler with the Kubernetes Operator, and a 
> restart strategy of exponential backoff, and so the JobManagers remained 
> alive. We're also using savepoint upgrade mode. 
> When we tried to remedy the situation by raising the direct memory allocation 
> to the pods, we were surprised that Flink used the last savepoint taken, 
> rather than the checkpoint. This was unfortunate for us because we are on 
> adaptive scheduler and the job hasn't changed in some time, so this last 
> savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up 
> until failure. I can confirm the HA metadata existed in the configmaps, and 
> the corresponding checkpoints existed in remote storage for it to access. 
> Plus, no Flink version changes were in the deployment.
> The Operator logs reported that it was using last-state recovery in this 
> situation:
> {code:java}
> 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
> FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : 
> null -> 100]), starting reconciliation.
> 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] 
> Upgrading/Restarting running job, suspending first...
> 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] 
> Job is not running but HA metadata is available for last state restore, ready 
> for upgrade
> 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUSPENDED   | Suspending existing deployment.
> 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] 
> Deleting JobManager deployment while preserving HA metadata. 
> 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Status | Info| UPGRADING   | The resource is being upgraded 
> 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUBMIT  | Starting deployment
> 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] 
> Deploying application cluster requiring last-state from HA metadata
> 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils [INFO ][job-name] Job 
> graph in ConfigMap job-name-cluster-config-map is deleted {code}
> But when the job booted up, it reported restoring from savepoint:
> {code:java}
> 2024-02-15 19:39:03,887 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 522b3c363499d81ed7922aa30b13e237 from Savepoint 20207 @ 0 for 
> 522b3c363499d81ed7922aa30b13e237 located at 
> abfss://savepoi...@storageaccount.dfs.core.windows.net/job-name/savepoint-522b3c-8836a1edc709.
>  {code}
> Our expectation was that the Operator logs were true, and that it would be 
> restoring from checkpoint. We had to scramble and manually restore from the 
> checkpoint to restore function.
>  
>  
> It's also worth noting I can recreate this issue in a testing environment. 
> The process for doing so is:
> - Boot up HA JobManagers with checkpoints on and savepoint upgrade mode, 
> using adaptive scheduler
> - Make a dummy change to trigger a savepoint.
> - Allow the TaskManagers to process some data and hit the checkpoint interval.
> - Cause the TaskManagers to crash. In our case, we could use up a bunch of 
> memory in the pods and cause it to crash.
> - Observe the Operator logs saying it is restoring from last-state, but watch 
> as the pods instead use the last savepoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34451) [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading fallback approach

2024-02-22 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819563#comment-17819563
 ] 

Gyula Fora commented on FLINK-34451:


[~alexdchoffer] so, just to confirm:

This issue doesn't occur with Flink 1.18? (even with the adaptive scheduler)

> [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading 
> fallback approach
> --
>
> Key: FLINK-34451
> URL: https://issues.apache.org/jira/browse/FLINK-34451
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: Operator version: 1.6.1
> Flink version 1.18.0
> HA JobManagers
> Adaptive scheduler mode using the operator's autoscaler
> Checkpointing at an interval of 60s
> Upgrade mode savepoint
>Reporter: Alex Hoffer
>Priority: Major
>
>  
> We had a situation where TaskManagers were constantly restarting from OOM. 
> We're using the Adaptive scheduler with the Kubernetes Operator, and a 
> restart strategy of exponential backoff, and so the JobManagers remained 
> alive. We're also using savepoint upgrade mode. 
> When we tried to remedy the situation by raising the direct memory allocation 
> to the pods, we were surprised that Flink used the last savepoint taken, 
> rather than the checkpoint. This was unfortunate for us because we are on 
> adaptive scheduler and the job hasn't changed in some time, so this last 
> savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up 
> until failure. I can confirm the HA metadata existed in the configmaps, and 
> the corresponding checkpoints existed in remote storage for it to access. 
> Plus, no Flink version changes were in the deployment.
> The Operator logs reported that it was using last-state recovery in this 
> situation:
> {code:java}
> 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
> FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : 
> null -> 100]), starting reconciliation.
> 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] 
> Upgrading/Restarting running job, suspending first...
> 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] 
> Job is not running but HA metadata is available for last state restore, ready 
> for upgrade
> 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUSPENDED   | Suspending existing deployment.
> 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] 
> Deleting JobManager deployment while preserving HA metadata. 
> 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Status | Info| UPGRADING   | The resource is being upgraded 
> 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUBMIT  | Starting deployment
> 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] 
> Deploying application cluster requiring last-state from HA metadata
> 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils [INFO ][job-name] Job 
> graph in ConfigMap job-name-cluster-config-map is deleted {code}
> But when the job booted up, it reported restoring from savepoint:
> {code:java}
> 2024-02-15 19:39:03,887 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 522b3c363499d81ed7922aa30b13e237 from Savepoint 20207 @ 0 for 
> 522b3c363499d81ed7922aa30b13e237 located at 
> abfss://savepoi...@storageaccount.dfs.core.windows.net/job-name/savepoint-522b3c-8836a1edc709.
>  {code}
> Our expectation was that the Operator logs were true, and that it would be 
> restoring from checkpoint. We had to scramble and manually restore from the 
> checkpoint to restore function.
>  
>  
> It's also worth noting I can recreate this issue in a testing environment. 
> The process for doing so is:
> - Boot up HA JobManagers with checkpoints on and savepoint upgrade mode, 
> using adaptive scheduler
> - Make a dummy change to trigger a savepoint.
> - Allow the TaskManagers to process some data and hit the checkpoint interval.
> - Cause the TaskManagers to crash. In our case, we could use up a bunch of 
> memory in the pods and cause it to crash.
> - Observe the Operator logs saying it is restoring from last-state, but watch 
> as the pods instead use the last savepoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34438) Kubernetes Operator doesn't wait for TaskManager deletion in native mode

2024-02-20 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-34438.
--
Fix Version/s: kubernetes-operator-1.8.0
   Resolution: Fixed

merged to main a8fd19429e93428e8a2498c32def24aa8ebbd4c4

> Kubernetes Operator doesn't wait for TaskManager deletion in native mode
> 
>
> Key: FLINK-34438
> URL: https://issues.apache.org/jira/browse/FLINK-34438
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0, kubernetes-operator-1.6.1, 
> kubernetes-operator-1.8.0
>Reporter: Mate Czagany
>Assignee: Mate Czagany
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> This issue was partly fixed in FLINK-32334 but native mode was not included 
> in the fix.
> I don't see any downsides with adding the same check to native deployment 
> mode, which would make sure that all TaskManagers were deleted when we shut 
> down a Flink cluster.
> There should also be some logs suggesting that the timeout was exceeded 
> instead of silently returning when waiting for the cluster to shut down.
> An issue was also mentioned on the mailing list which seems to be related to 
> this: [https://lists.apache.org/thread/4gwj4ob4n9zg7b90vnqohj8x1p0bb5cb]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34213) Consider using accumulated busy time instead of busyMsPerSecond

2024-02-20 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-34213.
--
Fix Version/s: kubernetes-operator-1.8.0
   Resolution: Fixed

merged to main 0a7588c084e7197f73e659e763f4d82d2d94ae76

> Consider using accumulated busy time instead of busyMsPerSecond
> ---
>
> Key: FLINK-34213
> URL: https://issues.apache.org/jira/browse/FLINK-34213
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Gyula Fora
>Priority: Minor
> Fix For: kubernetes-operator-1.8.0
>
>
> We might achieve much better accuracy if we used the accumulated busy time 
> metrics from Flink, instead of the momentarily collected ones.
> We would use the diff between the last accumulated and the current 
> accumulated busy time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34266) Output ratios should be computed over the whole metric window instead of averaged

2024-02-20 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-34266.
--
Fix Version/s: kubernetes-operator-1.8.0
   Resolution: Fixed

merged to main 0a7588c084e7197f73e659e763f4d82d2d94ae76

> Output ratios should be computed over the whole metric window instead of 
> averaged
> -
>
> Key: FLINK-34266
> URL: https://issues.apache.org/jira/browse/FLINK-34266
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Critical
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Currently Output ratios are computed during metric collection based on the 
> current in/out metrics an stored as part of the collected metrics.
> During evaluation the output ratios previously computed are then averaged 
> together in the metric window. This however leads to incorrect computation 
> due to the nature of the computation and averaging.
> Example:
> Let's look at a window operator that simply sorts and re-emits events in 
> windows. During the window collection phase, output ratio will be computed 
> and stored as 0. During the window computation the output ratio will be 
> last_input_rate / window_size.  Depending on the last input rate observation 
> this can be off when averaged into any direction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33244) Not Able To Pass the Configuration On Flink Session

2024-02-19 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-33244:
--

Assignee: Zhenqiu Huang

> Not Able To Pass the Configuration On Flink Session
> ---
>
> Key: FLINK-33244
> URL: https://issues.apache.org/jira/browse/FLINK-33244
> Project: Flink
>  Issue Type: Bug
>  Components: flink-contrib, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: Amarjeet Singh
>Assignee: Zhenqiu Huang
>Priority: Critical
> Fix For: 1.17.1
>
>
> Hi 
> I have tried configuring the flink run -D like 
> -Dmetrics.reporter=promgateway\
> -Dmetrics.reporter.promgateway.jobName: flink_test_outside
> these configuration .
> And Same is for FLink Kubernetive Operator
> Not able to Configure KuberConfiguraton using 
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkSessionJob
> metadata:
> name: flink-job-test
> spec:
> deploymentName: flink-session-cluster
> restartNonce: 11
> flinkConfiguration:
> # Flink Config Overrides
> kubernetes.operator.job.restart.failed: "true"
> metrics.reporters: "promgateway"
> metrics.reporter.promgateway.jobName: "flink_test_outside"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28645) Clean up logging in FlinkService / Reconciler

2024-02-19 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-28645:
--

Assignee: Zhenqiu Huang

> Clean up logging in FlinkService / Reconciler
> -
>
> Key: FLINK-28645
> URL: https://issues.apache.org/jira/browse/FLINK-28645
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Zhenqiu Huang
>Priority: Critical
>
> With the introduction of standalone mode, logging in the service 
> implementation / reconcilers became a bit chatotic and often redundant.
> We should ensure that we log consistently around cluster operations such as 
> cancellation, deletion, submission etc.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34451) [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading fallback approach

2024-02-16 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818126#comment-17818126
 ] 

Gyula Fora commented on FLINK-34451:


Before we can investigate the root cause it would be great to get some extra 
information:
 # Can you please share your spec to see the configuration / HA setup? (could 
this be related to multiple job managers?)
 # In your repro scenario, does it happen all the time?
 # Does this affect Kubernetes Operator 1.7.0 (or 1.8-SNAPSHOT if possible)?
 # Does this affect Flink 1.17 as well?
 # Does it affect only the adaptive scheduler?

 

I tried reproducing this in the basic-checkpoint-ha example (with flink 1.18.0 
and the adaptive scheduler) but I never hit any issue on either savepoint / 
last-state upgrades, failing TMs or not. 

 

> [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading 
> fallback approach
> --
>
> Key: FLINK-34451
> URL: https://issues.apache.org/jira/browse/FLINK-34451
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: Operator version: 1.6.1
> Flink version 1.18.0
> HA JobManagers
> Adaptive scheduler mode using the operator's autoscaler
> Checkpointing at an interval of 60s
> Upgrade mode savepoint
>Reporter: Alex Hoffer
>Priority: Major
>
>  
> We had a situation where TaskManagers were constantly restarting from OOM. 
> We're using the Adaptive scheduler with the Kubernetes Operator, and a 
> restart strategy of exponential backoff, and so the JobManagers remained 
> alive. We're also using savepoint upgrade mode. 
> When we tried to remedy the situation by raising the direct memory allocation 
> to the pods, we were surprised that Flink used the last savepoint taken, 
> rather than the checkpoint. This was unfortunate for us because we are on 
> adaptive scheduler and the job hasn't changed in some time, so this last 
> savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up 
> until failure. I can confirm the HA metadata existed in the configmaps, and 
> the corresponding checkpoints existed in remote storage for it to access. 
> Plus, no Flink version changes were in the deployment.
> The Operator logs reported that it was using last-state recovery in this 
> situation:
> {code:java}
> 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
> FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : 
> null -> 100]), starting reconciliation.
> 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] 
> Upgrading/Restarting running job, suspending first...
> 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] 
> Job is not running but HA metadata is available for last state restore, ready 
> for upgrade
> 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUSPENDED   | Suspending existing deployment.
> 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] 
> Deleting JobManager deployment while preserving HA metadata. 
> 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Status | Info| UPGRADING   | The resource is being upgraded 
> 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> 
> Event  | Info| SUBMIT  | Starting deployment
> 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] 
> Deploying application cluster requiring last-state from HA metadata
> 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils [INFO ][job-name] Job 
> graph in ConfigMap job-name-cluster-config-map is deleted {code}
> But when the job booted up, it reported restoring from savepoint:
> {code:java}
> 2024-02-15 19:39:03,887 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring 
> job 522b3c363499d81ed7922aa30b13e237 from Savepoint 20207 @ 0 for 
> 522b3c363499d81ed7922aa30b13e237 located at 
> abfss://savepoi...@storageaccount.dfs.core.windows.net/job-name/savepoint-522b3c-8836a1edc709.
>  {code}
> Our expectation was that the Operator logs were true, and that it would be 
> restoring from checkpoint. We had to scramble and manually restore from the 
> checkpoint to restore function.
>  
>  
> It's also worth noting I can recreate this issue in a testing environment. 
> The process for doing so is:
> - Boot up HA JobManagers with checkpoints on and savepoint upgrade mode, 
> using adaptive scheduler
> - Make a dummy change to trigger a savepoint.
> - Allow the TaskManagers to process some data and hit the 

[jira] [Closed] (FLINK-34439) Move chown operations to COPY commands in Dockerfile

2024-02-14 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-34439.
--
Fix Version/s: kubernetes-operator-1.8.0
   Resolution: Fixed

merged to main 6cab745df9d0a742ab15f341bdf69c8e802b2a39

> Move chown operations to COPY commands in Dockerfile
> 
>
> Key: FLINK-34439
> URL: https://issues.apache.org/jira/browse/FLINK-34439
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Mate Czagany
>Assignee: Mate Czagany
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> We can lower the size of the output operator container image if we don't run 
> 'chown' commands in seperate RUN commands inside the Dockerfile, but instead 
> use the '--chown' argument of the COPY command.
> Using 'RUN chown...' will copy all the files affected with their whole size 
> to a new layer, duplicating the previous files from the COPY command.
> Example:
> {code:java}
> $ docker image history ghcr.io/apache/flink-kubernetes-operator:ccb10b8
> ...
>      3 months ago  RUN /bin/sh -c chown -R flink:flink $FLINK...  
> 116MB       buildkit.dockerfile.v0
> ... {code}
> This would mean a 20% reduction in image size.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34439) Move chown operations to COPY commands in Dockerfile

2024-02-13 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-34439:
--

Assignee: Mate Czagany

> Move chown operations to COPY commands in Dockerfile
> 
>
> Key: FLINK-34439
> URL: https://issues.apache.org/jira/browse/FLINK-34439
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Mate Czagany
>Assignee: Mate Czagany
>Priority: Minor
>
> We can lower the size of the output operator container image if we don't run 
> 'chown' commands in seperate RUN commands inside the Dockerfile, but instead 
> use the '--chown' argument of the COPY command.
> Using 'RUN chown...' will copy all the files affected with their whole size 
> to a new layer, duplicating the previous files from the COPY command.
> Example:
> {code:java}
> $ docker image history ghcr.io/apache/flink-kubernetes-operator:ccb10b8
> ...
>      3 months ago  RUN /bin/sh -c chown -R flink:flink $FLINK...  
> 116MB       buildkit.dockerfile.v0
> ... {code}
> This would mean a 20% reduction in image size.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34438) Kubernetes Operator doesn't wait for TaskManager deletion in native mode

2024-02-13 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-34438:
--

Assignee: Mate Czagany

> Kubernetes Operator doesn't wait for TaskManager deletion in native mode
> 
>
> Key: FLINK-34438
> URL: https://issues.apache.org/jira/browse/FLINK-34438
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0, kubernetes-operator-1.6.1, 
> kubernetes-operator-1.8.0
>Reporter: Mate Czagany
>Assignee: Mate Czagany
>Priority: Major
>
> This issue was partly fixed in FLINK-32334 but native mode was not included 
> in the fix.
> I don't see any downsides with adding the same check to native deployment 
> mode, which would make sure that all TaskManagers were deleted when we shut 
> down a Flink cluster.
> There should also be some logs suggesting that the timeout was exceeded 
> instead of silently returning when waiting for the cluster to shut down.
> An issue was also mentioned on the mailing list which seems to be related to 
> this: [https://lists.apache.org/thread/4gwj4ob4n9zg7b90vnqohj8x1p0bb5cb]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34213) Consider using accumulated busy time instead of busyMsPerSecond

2024-02-13 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-34213:
--

Assignee: Gyula Fora

> Consider using accumulated busy time instead of busyMsPerSecond
> ---
>
> Key: FLINK-34213
> URL: https://issues.apache.org/jira/browse/FLINK-34213
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Gyula Fora
>Priority: Minor
>
> We might achieve much better accuracy if we used the accumulated busy time 
> metrics from Flink, instead of the momentarily collected ones.
> We would use the diff between the last accumulated and the current 
> accumulated busy time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   10   >