[jira] [Commented] (FLINK-32592) (Stream)ExEnv#initializeContextEnvironment isn't thread-safe

2023-07-21 Thread Fabio Wanner (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17745668#comment-17745668
 ] 

Fabio Wanner commented on FLINK-32592:
--

We did a few hundred deployments and did not run into the issue anymore! Looks 
resolved! Thanks a lot! 

> (Stream)ExEnv#initializeContextEnvironment isn't thread-safe
> 
>
> Key: FLINK-32592
> URL: https://issues.apache.org/jira/browse/FLINK-32592
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.15.4, 1.18.0, 1.17.1
>Reporter: Fabio Wanner
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> *Context*
> We are using the flink-k8s-operator to deploy multiple jobs (up to 32) to a 
> single session cluster. The job submissions done by the operator happen 
> concurrently, basically at the same time.
> Operator version: 1.5.0
> Flink version:  1.15.4, 1.7.1, 1.18 (master@f37d41cf)
> *Problem*
> Rarely (~once every 50 deployments) one of the jobs will not be executed. In 
> the following incident 4 jobs are deployed at the same time:
>  * gorner-task-staging-e5730831
>  * gorner-facility-staging-e5730831
>  * gorner-aepp-staging-e5730831
>  * gorner-session-staging-e5730831
>  
> The operator submits the job, they all get a reasonable jobID:
> {code:java}
> 2023-07-14 10:25:35,295 o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][aelps-staging/gorner-task-staging-e5730831] Submitting job: 
> 4968b186061e44390002 to session cluster.
> 2023-07-14 10:25:35,297 o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][aelps-staging/gorner-facility-staging-e5730831] Submitting job: 
> 91a5260d916c4dff0002 to session cluster.
> 2023-07-14 10:25:35,301 o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][aelps-staging/gorner-aepp-staging-e5730831] Submitting job: 
> 103c0446e14749a10002 to session cluster.
> 2023-07-14 10:25:35,302 o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][aelps-staging/gorner-session-staging-e5730831] Submitting job: 
> de59304d370b4b8e0002 to session cluster.
> {code}
> In the cluster the JarRunHandler's handleRequest() method will get the 
> request, all 4 jobIDs are present (also all args, etc are correct):
> {code:java}
> 2023-07-14 10:25:35,320 WARN  
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - 
> handleRequest - requestBody.jobId: 4968b186061e44390002
> 2023-07-14 10:25:35,321 WARN  
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - 
> handleRequest - requestBody.jobId: de59304d370b4b8e0002
> 2023-07-14 10:25:35,321 WARN  
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - 
> handleRequest - requestBody.jobId: 91a5260d916c4dff0002
> 2023-07-14 10:25:35,321 WARN  
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - 
> handleRequest - requestBody.jobId: 103c0446e14749a10002
> {code}
> But once the EmbeddedExecutor's submitAndGetJobClientFuture() method is 
> called instead of getting 1 call per jobID we have 4 calls but one of the 
> jobIDs twice:
> {code:java}
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - optJobId: Optional[4968b186061e44390002]
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - optJobId: Optional[103c0446e14749a10002]
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - optJobId: Optional[de59304d370b4b8e0002]
> 2023-07-14 10:25:35,721 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - optJobId: Optional[de59304d370b4b8e0002]
> {code}
> If this is important: the jobGraph obtained does not match the jobID. We get 
> 2 times de59304d370b4b8e0002 but the jobgraph for this jobID is 
> never returned by getJobGraph() in 
> EmbeddedExecutor.submitAndGetJobClientFuture().
> This will then lead to the job already existing:
> {code:java}
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - submittedJobIds: []
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - submittedJobIds: []
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - submittedJobIds: []
> 2023-07-14 10:25:35,721 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> 

[jira] [Resolved] (FLINK-32552) Mixed up Flink session job deployments

2023-07-19 Thread Fabio Wanner (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabio Wanner resolved FLINK-32552.
--
Release Note: Not a bug of the flink k8s operator.
  Resolution: Not A Bug

> Mixed up Flink session job deployments
> --
>
> Key: FLINK-32552
> URL: https://issues.apache.org/jira/browse/FLINK-32552
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Fabio Wanner
>Priority: Major
>
> *Context*
> In the scope of end-to-end tests we deploy all the Flink session jobs we have 
> regularly in a staging environment. Some of the jobs are bundled together in 
> one helm chart and therefore deployed at the same time. There are around 40 
> individual Flink jobs (running on the same Flink session cluster). The 
> session cluster is individual for each e2e test run. The problems described 
> below happen scarcely (1 in ~ 50 run maybe).
> *Problem*
> Rarely the operator seems to "mix up" the deployments. This can be seen in 
> the Flink cluster logs as multiple {{Received JobGraph submission ' NAME>' ()}} logs are created from jobs with the same job_id. This 
> results in errors such as:
> {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}}
> It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName 
> does not match the expected job name of the job being deployed (The job name 
> is passed to the application via argument).
> So far we were unable to reliably reproduce the error.
> *Details*
> The following lines show the status of 3 jobs form the view point of the 
> Flink cluster dashboard, and the FlinkSessionJob ressource:
>  
> *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: Restarting
>  * ID: a7d36f3881f943a2
>  * Exceptions: Cannot load user class: 
> aelps.pipelines.aletsch.smc.SMCUrlMapper
> FlinkSessionJob Ressource:
>  * State: RUNNING
>  * jobId: a1221c743367497b0002
>  * uid: a1221c74-3367-497b-ad2f-8793ab23919d
>  
> *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: -
>  * ID: -
> FlinkSessionJob Ressource:
>  * State: UPGRADING
>  * jobId: -
>  * uid: a7d36f38-81f9-43a0-898f-19b950430e9d
> Flink K8s Operator:
>  * Exceptions: DuplicateJobSubmissionException: Job has already been 
> submitted.
>  
> *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: Running
>  * ID: e692c2dfaa18441c0002
>  * Exceptions: -
> FlinkSessionJob Ressource:
>  * State: RUNNING
>  * jobId: e692c2dfaa18441c0002
>  * uid: e692c2df-aa18-441c-a352-88aefa9a3017
> As we can see the *aletsch_smc* job is presumably running according to the 
> FlinkSessionJob resource, but crash-looping in the cluster and it has the 
> jobID matching the uid of the resource of {*}aletsch_mat{*}. While 
> *aletsch_mat* is not even running. The following logs also show some 
> suspicious entries: There are several {{Received JobGraph submission}} from 
> different jobs with the same jobID.
>  
> *Logs*
> The logs are filtered by the 3 jobIds from above.
>  
> JobID: a7d36f3881f943a2
> {code:bash}
> Flink Cluster
> ...
> 023-07-06 10:23:50,552 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
> aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 
> (a7d36f3881f943a2) switched from state RUNNING to RESTARTING.
> 2023-07-06 10:23:50   file: 
> '/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2'
>  (valid JAR)
> 2023-07-06 10:23:50,522 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Received resource requirements from job 
> a7d36f3881f943a2: 
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
> numberOfRequiredSlots=4}]
> 2023-07-06 10:23:50,522 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Received resource requirements from job 
> a7d36f3881f943a2: 
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
> numberOfRequiredSlots=3}]
> 2023-07-06 10:23:50,522 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Received resource requirements from job 
> a7d36f3881f943a2: 
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
> numberOfRequiredSlots=2}]
> 2023-07-06 10:23:50,522 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Received resource requirements from job 
> a7d36f3881f943a2: 
> 

[jira] [Commented] (FLINK-32592) (Stream)ExEnv#initializeContextEnvironment isn't thread-safe

2023-07-16 Thread Fabio Wanner (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743495#comment-17743495
 ] 

Fabio Wanner commented on FLINK-32592:
--

Hey! Sorry for my late response and thanks a lot for looking into the issue! As 
the job submission is done by the official flink k8s operator we do not have 
direct control over how the jobs are run.

I am currently on vacation, but I will be able to verify your fix on Wednesday!

> (Stream)ExEnv#initializeContextEnvironment isn't thread-safe
> 
>
> Key: FLINK-32592
> URL: https://issues.apache.org/jira/browse/FLINK-32592
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.15.4, 1.18.0, 1.17.1
>Reporter: Fabio Wanner
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> *Context*
> We are using the flink-k8s-operator to deploy multiple jobs (up to 32) to a 
> single session cluster. The job submissions done by the operator happen 
> concurrently, basically at the same time.
> Operator version: 1.5.0
> Flink version:  1.15.4, 1.7.1, 1.18 (master@f37d41cf)
> *Problem*
> Rarely (~once every 50 deployments) one of the jobs will not be executed. In 
> the following incident 4 jobs are deployed at the same time:
>  * gorner-task-staging-e5730831
>  * gorner-facility-staging-e5730831
>  * gorner-aepp-staging-e5730831
>  * gorner-session-staging-e5730831
>  
> The operator submits the job, they all get a reasonable jobID:
> {code:java}
> 2023-07-14 10:25:35,295 o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][aelps-staging/gorner-task-staging-e5730831] Submitting job: 
> 4968b186061e44390002 to session cluster.
> 2023-07-14 10:25:35,297 o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][aelps-staging/gorner-facility-staging-e5730831] Submitting job: 
> 91a5260d916c4dff0002 to session cluster.
> 2023-07-14 10:25:35,301 o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][aelps-staging/gorner-aepp-staging-e5730831] Submitting job: 
> 103c0446e14749a10002 to session cluster.
> 2023-07-14 10:25:35,302 o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][aelps-staging/gorner-session-staging-e5730831] Submitting job: 
> de59304d370b4b8e0002 to session cluster.
> {code}
> In the cluster the JarRunHandler's handleRequest() method will get the 
> request, all 4 jobIDs are present (also all args, etc are correct):
> {code:java}
> 2023-07-14 10:25:35,320 WARN  
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - 
> handleRequest - requestBody.jobId: 4968b186061e44390002
> 2023-07-14 10:25:35,321 WARN  
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - 
> handleRequest - requestBody.jobId: de59304d370b4b8e0002
> 2023-07-14 10:25:35,321 WARN  
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - 
> handleRequest - requestBody.jobId: 91a5260d916c4dff0002
> 2023-07-14 10:25:35,321 WARN  
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - 
> handleRequest - requestBody.jobId: 103c0446e14749a10002
> {code}
> But once the EmbeddedExecutor's submitAndGetJobClientFuture() method is 
> called instead of getting 1 call per jobID we have 4 calls but one of the 
> jobIDs twice:
> {code:java}
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - optJobId: Optional[4968b186061e44390002]
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - optJobId: Optional[103c0446e14749a10002]
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - optJobId: Optional[de59304d370b4b8e0002]
> 2023-07-14 10:25:35,721 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - optJobId: Optional[de59304d370b4b8e0002]
> {code}
> If this is important: the jobGraph obtained does not match the jobID. We get 
> 2 times de59304d370b4b8e0002 but the jobgraph for this jobID is 
> never returned by getJobGraph() in 
> EmbeddedExecutor.submitAndGetJobClientFuture().
> This will then lead to the job already existing:
> {code:java}
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - submittedJobIds: []
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - submittedJobIds: []
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - 

[jira] [Commented] (FLINK-32592) Mixed-up job execution on concurrent job submission

2023-07-14 Thread Fabio Wanner (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743136#comment-17743136
 ] 

Fabio Wanner commented on FLINK-32592:
--

I would be happy to assist in fixing this issue, but could really use some 
pointers to find the root cause of the problem...

> Mixed-up job execution on concurrent job submission
> ---
>
> Key: FLINK-32592
> URL: https://issues.apache.org/jira/browse/FLINK-32592
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.15.4, 1.18.0, 1.17.1
>Reporter: Fabio Wanner
>Priority: Major
>
> *Context*
> We are using the flink-k8s-operator to deploy multiple jobs (up to 32) to a 
> single session cluster. The job submissions done by the operator happen 
> concurrently, basically at the same time.
> Operator version: 1.5.0
> Flink version:  1.15.4, 1.7.1, 1.18 (master@f37d41cf)
> *Problem*
> Rarely (~once every 50 deployments) one of the jobs will not be executed. In 
> the following incident 4 jobs are deployed at the same time:
>  * gorner-task-staging-e5730831
>  * gorner-facility-staging-e5730831
>  * gorner-aepp-staging-e5730831
>  * gorner-session-staging-e5730831
>  
> The operator submits the job, they all get a reasonable jobID:
> {code:java}
> 2023-07-14 10:25:35,295 o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][aelps-staging/gorner-task-staging-e5730831] Submitting job: 
> 4968b186061e44390002 to session cluster.
> 2023-07-14 10:25:35,297 o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][aelps-staging/gorner-facility-staging-e5730831] Submitting job: 
> 91a5260d916c4dff0002 to session cluster.
> 2023-07-14 10:25:35,301 o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][aelps-staging/gorner-aepp-staging-e5730831] Submitting job: 
> 103c0446e14749a10002 to session cluster.
> 2023-07-14 10:25:35,302 o.a.f.k.o.s.AbstractFlinkService [INFO 
> ][aelps-staging/gorner-session-staging-e5730831] Submitting job: 
> de59304d370b4b8e0002 to session cluster.
> {code}
> In the cluster the JarRunHandler's handleRequest() method will get the 
> request, all 4 jobIDs are present (also all args, etc are correct):
> {code:java}
> 2023-07-14 10:25:35,320 WARN  
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - 
> handleRequest - requestBody.jobId: 4968b186061e44390002
> 2023-07-14 10:25:35,321 WARN  
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - 
> handleRequest - requestBody.jobId: de59304d370b4b8e0002
> 2023-07-14 10:25:35,321 WARN  
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - 
> handleRequest - requestBody.jobId: 91a5260d916c4dff0002
> 2023-07-14 10:25:35,321 WARN  
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - 
> handleRequest - requestBody.jobId: 103c0446e14749a10002
> {code}
> But once the EmbeddedExecutor's submitAndGetJobClientFuture() method is 
> called instead of getting 1 call per jobID we have 4 calls but one of the 
> jobIDs twice:
> {code:java}
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - optJobId: Optional[4968b186061e44390002]
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - optJobId: Optional[103c0446e14749a10002]
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - optJobId: Optional[de59304d370b4b8e0002]
> 2023-07-14 10:25:35,721 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - optJobId: Optional[de59304d370b4b8e0002]
> {code}
> If this is important: the jobGraph obtained does not match the jobID. We get 
> 2 times de59304d370b4b8e0002 but the jobgraph for this jobID is 
> never returned by getJobGraph() in 
> EmbeddedExecutor.submitAndGetJobClientFuture().
> This will then lead to the job already existing:
> {code:java}
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - submittedJobIds: []
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - submittedJobIds: []
> 2023-07-14 10:25:35,616 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - submittedJobIds: []
> 2023-07-14 10:25:35,721 WARN  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - execute - submittedJobIds: [de59304d370b4b8e0002]
> {code}
> But since the jobs are completely different the 

[jira] [Commented] (FLINK-32552) Mixed up Flink session job deployments

2023-07-14 Thread Fabio Wanner (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743135#comment-17743135
 ] 

Fabio Wanner commented on FLINK-32552:
--

BTW: As a really bad workaround the problem can be "fixed" by adding big enough 
random delays in the AbstractFlinkResourceReconcilers reconcile() method on 
first deployment...

> Mixed up Flink session job deployments
> --
>
> Key: FLINK-32552
> URL: https://issues.apache.org/jira/browse/FLINK-32552
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Fabio Wanner
>Priority: Major
>
> *Context*
> In the scope of end-to-end tests we deploy all the Flink session jobs we have 
> regularly in a staging environment. Some of the jobs are bundled together in 
> one helm chart and therefore deployed at the same time. There are around 40 
> individual Flink jobs (running on the same Flink session cluster). The 
> session cluster is individual for each e2e test run. The problems described 
> below happen scarcely (1 in ~ 50 run maybe).
> *Problem*
> Rarely the operator seems to "mix up" the deployments. This can be seen in 
> the Flink cluster logs as multiple {{Received JobGraph submission ' NAME>' ()}} logs are created from jobs with the same job_id. This 
> results in errors such as:
> {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}}
> It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName 
> does not match the expected job name of the job being deployed (The job name 
> is passed to the application via argument).
> So far we were unable to reliably reproduce the error.
> *Details*
> The following lines show the status of 3 jobs form the view point of the 
> Flink cluster dashboard, and the FlinkSessionJob ressource:
>  
> *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: Restarting
>  * ID: a7d36f3881f943a2
>  * Exceptions: Cannot load user class: 
> aelps.pipelines.aletsch.smc.SMCUrlMapper
> FlinkSessionJob Ressource:
>  * State: RUNNING
>  * jobId: a1221c743367497b0002
>  * uid: a1221c74-3367-497b-ad2f-8793ab23919d
>  
> *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: -
>  * ID: -
> FlinkSessionJob Ressource:
>  * State: UPGRADING
>  * jobId: -
>  * uid: a7d36f38-81f9-43a0-898f-19b950430e9d
> Flink K8s Operator:
>  * Exceptions: DuplicateJobSubmissionException: Job has already been 
> submitted.
>  
> *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: Running
>  * ID: e692c2dfaa18441c0002
>  * Exceptions: -
> FlinkSessionJob Ressource:
>  * State: RUNNING
>  * jobId: e692c2dfaa18441c0002
>  * uid: e692c2df-aa18-441c-a352-88aefa9a3017
> As we can see the *aletsch_smc* job is presumably running according to the 
> FlinkSessionJob resource, but crash-looping in the cluster and it has the 
> jobID matching the uid of the resource of {*}aletsch_mat{*}. While 
> *aletsch_mat* is not even running. The following logs also show some 
> suspicious entries: There are several {{Received JobGraph submission}} from 
> different jobs with the same jobID.
>  
> *Logs*
> The logs are filtered by the 3 jobIds from above.
>  
> JobID: a7d36f3881f943a2
> {code:bash}
> Flink Cluster
> ...
> 023-07-06 10:23:50,552 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
> aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 
> (a7d36f3881f943a2) switched from state RUNNING to RESTARTING.
> 2023-07-06 10:23:50   file: 
> '/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2'
>  (valid JAR)
> 2023-07-06 10:23:50,522 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Received resource requirements from job 
> a7d36f3881f943a2: 
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
> numberOfRequiredSlots=4}]
> 2023-07-06 10:23:50,522 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Received resource requirements from job 
> a7d36f3881f943a2: 
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
> numberOfRequiredSlots=3}]
> 2023-07-06 10:23:50,522 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Received resource requirements from job 
> a7d36f3881f943a2: 
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
> numberOfRequiredSlots=2}]
> 2023-07-06 10:23:50,522 INFO  
> 

[jira] [Comment Edited] (FLINK-32552) Mixed up Flink session job deployments

2023-07-14 Thread Fabio Wanner (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743096#comment-17743096
 ] 

Fabio Wanner edited comment on FLINK-32552 at 7/14/23 12:27 PM:


It's definitely not a bug in the operator:

I deployed 4 different jobs at the same time and observed the following:
4 calls are made of the method JarRunHandler.handleRequest() and when looking 
at the request 4 distinct ids for the 4 jobs are present, but the 
EmbeddedExecutor's submitAndGetJobClientFuture() method (also called 4 times) 
will have 3 distinct optJobIds and one duplicate, leading to different flavors 
of the problem described in this but ticket (depending on the exact timing of 
the parallel job launches). Btw: Flink versions 1.15.4 & 1.17.1. show this 
issue.

I have opened a new bug ticket with the relevant information: 
https://issues.apache.org/jira/browse/FLINK-32592


was (Author: JIRAUSER298842):
It's definitely not a bug in the operator:

I deployed 4 different jobs at the same time and observed the following:
4 calls are made of the method JarRunHandler.handleRequest() and when looking 
at the request 4 distinct ids for the 4 jobs are present, but the 
EmbeddedExecutor's submitAndGetJobClientFuture() method (also called 4 times) 
will have 3 distinct optJobIds and one duplicate, leading to different flavors 
of the problem described in this but ticket (depending on the exact timing of 
the parallel job launches). Btw: Flink versions 1.15.4 & 1.17.1. show this 
issue.

I will open a new bug ticket with the relevant information and link it here.

> Mixed up Flink session job deployments
> --
>
> Key: FLINK-32552
> URL: https://issues.apache.org/jira/browse/FLINK-32552
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Fabio Wanner
>Priority: Major
>
> *Context*
> In the scope of end-to-end tests we deploy all the Flink session jobs we have 
> regularly in a staging environment. Some of the jobs are bundled together in 
> one helm chart and therefore deployed at the same time. There are around 40 
> individual Flink jobs (running on the same Flink session cluster). The 
> session cluster is individual for each e2e test run. The problems described 
> below happen scarcely (1 in ~ 50 run maybe).
> *Problem*
> Rarely the operator seems to "mix up" the deployments. This can be seen in 
> the Flink cluster logs as multiple {{Received JobGraph submission ' NAME>' ()}} logs are created from jobs with the same job_id. This 
> results in errors such as:
> {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}}
> It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName 
> does not match the expected job name of the job being deployed (The job name 
> is passed to the application via argument).
> So far we were unable to reliably reproduce the error.
> *Details*
> The following lines show the status of 3 jobs form the view point of the 
> Flink cluster dashboard, and the FlinkSessionJob ressource:
>  
> *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: Restarting
>  * ID: a7d36f3881f943a2
>  * Exceptions: Cannot load user class: 
> aelps.pipelines.aletsch.smc.SMCUrlMapper
> FlinkSessionJob Ressource:
>  * State: RUNNING
>  * jobId: a1221c743367497b0002
>  * uid: a1221c74-3367-497b-ad2f-8793ab23919d
>  
> *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: -
>  * ID: -
> FlinkSessionJob Ressource:
>  * State: UPGRADING
>  * jobId: -
>  * uid: a7d36f38-81f9-43a0-898f-19b950430e9d
> Flink K8s Operator:
>  * Exceptions: DuplicateJobSubmissionException: Job has already been 
> submitted.
>  
> *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: Running
>  * ID: e692c2dfaa18441c0002
>  * Exceptions: -
> FlinkSessionJob Ressource:
>  * State: RUNNING
>  * jobId: e692c2dfaa18441c0002
>  * uid: e692c2df-aa18-441c-a352-88aefa9a3017
> As we can see the *aletsch_smc* job is presumably running according to the 
> FlinkSessionJob resource, but crash-looping in the cluster and it has the 
> jobID matching the uid of the resource of {*}aletsch_mat{*}. While 
> *aletsch_mat* is not even running. The following logs also show some 
> suspicious entries: There are several {{Received JobGraph submission}} from 
> different jobs with the same jobID.
>  
> *Logs*
> The logs are filtered by the 3 jobIds from above.
>  
> JobID: a7d36f3881f943a2
> {code:bash}
> Flink Cluster
> ...
> 023-07-06 10:23:50,552 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
> aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 
> 

[jira] [Created] (FLINK-32592) Mixed-up job execution on concurrent job submission

2023-07-14 Thread Fabio Wanner (Jira)
Fabio Wanner created FLINK-32592:


 Summary: Mixed-up job execution on concurrent job submission
 Key: FLINK-32592
 URL: https://issues.apache.org/jira/browse/FLINK-32592
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.17.1, 1.15.4, 1.18.0
Reporter: Fabio Wanner


*Context*

We are using the flink-k8s-operator to deploy multiple jobs (up to 32) to a 
single session cluster. The job submissions done by the operator happen 
concurrently, basically at the same time.

Operator version: 1.5.0

Flink version:  1.15.4, 1.7.1, 1.18 (master@f37d41cf)

*Problem*

Rarely (~once every 50 deployments) one of the jobs will not be executed. In 
the following incident 4 jobs are deployed at the same time:
 * gorner-task-staging-e5730831
 * gorner-facility-staging-e5730831
 * gorner-aepp-staging-e5730831
 * gorner-session-staging-e5730831

 
The operator submits the job, they all get a reasonable jobID:
{code:java}
2023-07-14 10:25:35,295 o.a.f.k.o.s.AbstractFlinkService [INFO 
][aelps-staging/gorner-task-staging-e5730831] Submitting job: 
4968b186061e44390002 to session cluster.
2023-07-14 10:25:35,297 o.a.f.k.o.s.AbstractFlinkService [INFO 
][aelps-staging/gorner-facility-staging-e5730831] Submitting job: 
91a5260d916c4dff0002 to session cluster.
2023-07-14 10:25:35,301 o.a.f.k.o.s.AbstractFlinkService [INFO 
][aelps-staging/gorner-aepp-staging-e5730831] Submitting job: 
103c0446e14749a10002 to session cluster.
2023-07-14 10:25:35,302 o.a.f.k.o.s.AbstractFlinkService [INFO 
][aelps-staging/gorner-session-staging-e5730831] Submitting job: 
de59304d370b4b8e0002 to session cluster.
{code}
In the cluster the JarRunHandler's handleRequest() method will get the request, 
all 4 jobIDs are present (also all args, etc are correct):
{code:java}
2023-07-14 10:25:35,320 WARN  
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - handleRequest 
- requestBody.jobId: 4968b186061e44390002
2023-07-14 10:25:35,321 WARN  
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - handleRequest 
- requestBody.jobId: de59304d370b4b8e0002
2023-07-14 10:25:35,321 WARN  
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - handleRequest 
- requestBody.jobId: 91a5260d916c4dff0002
2023-07-14 10:25:35,321 WARN  
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - handleRequest 
- requestBody.jobId: 103c0446e14749a10002
{code}
But once the EmbeddedExecutor's submitAndGetJobClientFuture() method is called 
instead of getting 1 call per jobID we have 4 calls but one of the jobIDs twice:
{code:java}
2023-07-14 10:25:35,616 WARN  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
execute - optJobId: Optional[4968b186061e44390002]
2023-07-14 10:25:35,616 WARN  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
execute - optJobId: Optional[103c0446e14749a10002]
2023-07-14 10:25:35,616 WARN  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
execute - optJobId: Optional[de59304d370b4b8e0002]
2023-07-14 10:25:35,721 WARN  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
execute - optJobId: Optional[de59304d370b4b8e0002]
{code}
If this is important: the jobGraph obtained does not match the jobID. We get 2 
times de59304d370b4b8e0002 but the jobgraph for this jobID is never 
returned by getJobGraph() in EmbeddedExecutor.submitAndGetJobClientFuture().

This will then lead to the job already existing:
{code:java}
2023-07-14 10:25:35,616 WARN  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
execute - submittedJobIds: []
2023-07-14 10:25:35,616 WARN  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
execute - submittedJobIds: []
2023-07-14 10:25:35,616 WARN  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
execute - submittedJobIds: []
2023-07-14 10:25:35,721 WARN  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
execute - submittedJobIds: [de59304d370b4b8e0002]
{code}
But since the jobs are completely different the execution will fail. Depending 
on the timing with one of the following exceptions:
 * RestHandlerException: No jobs included in application
 * ClassNotFoundException: 
io.dectris.aelps.pipelines.gorner.facility.FacilityEventProcessor

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32552) Mixed up Flink session job deployments

2023-07-14 Thread Fabio Wanner (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743096#comment-17743096
 ] 

Fabio Wanner edited comment on FLINK-32552 at 7/14/23 10:43 AM:


It's definitely not a bug in the operator:

I deployed 4 different jobs at the same time and observed the following:
4 calls are made of the method JarRunHandler.handleRequest() and when looking 
at the request 4 distinct ids for the 4 jobs are present, but the 
EmbeddedExecutor's submitAndGetJobClientFuture() method (also called 4 times) 
will have 3 distinct optJobIds and one duplicate, leading to different flavors 
of the problem described in this but ticket (depending on the exact timing of 
the parallel job launches). Btw: Flink versions 1.15.4 & 1.17.1. show this 
issue.

I will open a new bug ticket with the relevant information and link it here.


was (Author: JIRAUSER298842):
It's definitely not a bug in the operator:


I deployed 4 different jobs at the same time and observed the following:
4 calls are made of the method JarRunHandler.handleRequest() and when looking 
at the request 4 distinct ids for the 4 jobs are present, but the 
EmbeddedExecutor's submitAndGetJobClientFuture() method (also called 4 times) 
will have 3 distinct optJobIds and one duplicate, leading to different flavors 
of the problem described in this but ticket (depending on the exact timing of 
the parallel job launches). 

I will open a new bug ticket with the relevant information and link it here.

> Mixed up Flink session job deployments
> --
>
> Key: FLINK-32552
> URL: https://issues.apache.org/jira/browse/FLINK-32552
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Fabio Wanner
>Priority: Major
>
> *Context*
> In the scope of end-to-end tests we deploy all the Flink session jobs we have 
> regularly in a staging environment. Some of the jobs are bundled together in 
> one helm chart and therefore deployed at the same time. There are around 40 
> individual Flink jobs (running on the same Flink session cluster). The 
> session cluster is individual for each e2e test run. The problems described 
> below happen scarcely (1 in ~ 50 run maybe).
> *Problem*
> Rarely the operator seems to "mix up" the deployments. This can be seen in 
> the Flink cluster logs as multiple {{Received JobGraph submission ' NAME>' ()}} logs are created from jobs with the same job_id. This 
> results in errors such as:
> {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}}
> It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName 
> does not match the expected job name of the job being deployed (The job name 
> is passed to the application via argument).
> So far we were unable to reliably reproduce the error.
> *Details*
> The following lines show the status of 3 jobs form the view point of the 
> Flink cluster dashboard, and the FlinkSessionJob ressource:
>  
> *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: Restarting
>  * ID: a7d36f3881f943a2
>  * Exceptions: Cannot load user class: 
> aelps.pipelines.aletsch.smc.SMCUrlMapper
> FlinkSessionJob Ressource:
>  * State: RUNNING
>  * jobId: a1221c743367497b0002
>  * uid: a1221c74-3367-497b-ad2f-8793ab23919d
>  
> *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: -
>  * ID: -
> FlinkSessionJob Ressource:
>  * State: UPGRADING
>  * jobId: -
>  * uid: a7d36f38-81f9-43a0-898f-19b950430e9d
> Flink K8s Operator:
>  * Exceptions: DuplicateJobSubmissionException: Job has already been 
> submitted.
>  
> *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: Running
>  * ID: e692c2dfaa18441c0002
>  * Exceptions: -
> FlinkSessionJob Ressource:
>  * State: RUNNING
>  * jobId: e692c2dfaa18441c0002
>  * uid: e692c2df-aa18-441c-a352-88aefa9a3017
> As we can see the *aletsch_smc* job is presumably running according to the 
> FlinkSessionJob resource, but crash-looping in the cluster and it has the 
> jobID matching the uid of the resource of {*}aletsch_mat{*}. While 
> *aletsch_mat* is not even running. The following logs also show some 
> suspicious entries: There are several {{Received JobGraph submission}} from 
> different jobs with the same jobID.
>  
> *Logs*
> The logs are filtered by the 3 jobIds from above.
>  
> JobID: a7d36f3881f943a2
> {code:bash}
> Flink Cluster
> ...
> 023-07-06 10:23:50,552 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
> aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 
> (a7d36f3881f943a2) switched from state RUNNING to RESTARTING.
> 2023-07-06 10:23:50 

[jira] [Commented] (FLINK-32552) Mixed up Flink session job deployments

2023-07-14 Thread Fabio Wanner (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743096#comment-17743096
 ] 

Fabio Wanner commented on FLINK-32552:
--

It's definitely not a bug in the operator:


I deployed 4 different jobs at the same time and observed the following:
4 calls are made of the method JarRunHandler.handleRequest() and when looking 
at the request 4 distinct ids for the 4 jobs are present, but the 
EmbeddedExecutor's submitAndGetJobClientFuture() method (also called 4 times) 
will have 3 distinct optJobIds and one duplicate, leading to different flavors 
of the problem described in this but ticket (depending on the exact timing of 
the parallel job launches). 

I will open a new bug ticket with the relevant information and link it here.

> Mixed up Flink session job deployments
> --
>
> Key: FLINK-32552
> URL: https://issues.apache.org/jira/browse/FLINK-32552
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Fabio Wanner
>Priority: Major
>
> *Context*
> In the scope of end-to-end tests we deploy all the Flink session jobs we have 
> regularly in a staging environment. Some of the jobs are bundled together in 
> one helm chart and therefore deployed at the same time. There are around 40 
> individual Flink jobs (running on the same Flink session cluster). The 
> session cluster is individual for each e2e test run. The problems described 
> below happen scarcely (1 in ~ 50 run maybe).
> *Problem*
> Rarely the operator seems to "mix up" the deployments. This can be seen in 
> the Flink cluster logs as multiple {{Received JobGraph submission ' NAME>' ()}} logs are created from jobs with the same job_id. This 
> results in errors such as:
> {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}}
> It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName 
> does not match the expected job name of the job being deployed (The job name 
> is passed to the application via argument).
> So far we were unable to reliably reproduce the error.
> *Details*
> The following lines show the status of 3 jobs form the view point of the 
> Flink cluster dashboard, and the FlinkSessionJob ressource:
>  
> *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: Restarting
>  * ID: a7d36f3881f943a2
>  * Exceptions: Cannot load user class: 
> aelps.pipelines.aletsch.smc.SMCUrlMapper
> FlinkSessionJob Ressource:
>  * State: RUNNING
>  * jobId: a1221c743367497b0002
>  * uid: a1221c74-3367-497b-ad2f-8793ab23919d
>  
> *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: -
>  * ID: -
> FlinkSessionJob Ressource:
>  * State: UPGRADING
>  * jobId: -
>  * uid: a7d36f38-81f9-43a0-898f-19b950430e9d
> Flink K8s Operator:
>  * Exceptions: DuplicateJobSubmissionException: Job has already been 
> submitted.
>  
> *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: Running
>  * ID: e692c2dfaa18441c0002
>  * Exceptions: -
> FlinkSessionJob Ressource:
>  * State: RUNNING
>  * jobId: e692c2dfaa18441c0002
>  * uid: e692c2df-aa18-441c-a352-88aefa9a3017
> As we can see the *aletsch_smc* job is presumably running according to the 
> FlinkSessionJob resource, but crash-looping in the cluster and it has the 
> jobID matching the uid of the resource of {*}aletsch_mat{*}. While 
> *aletsch_mat* is not even running. The following logs also show some 
> suspicious entries: There are several {{Received JobGraph submission}} from 
> different jobs with the same jobID.
>  
> *Logs*
> The logs are filtered by the 3 jobIds from above.
>  
> JobID: a7d36f3881f943a2
> {code:bash}
> Flink Cluster
> ...
> 023-07-06 10:23:50,552 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
> aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 
> (a7d36f3881f943a2) switched from state RUNNING to RESTARTING.
> 2023-07-06 10:23:50   file: 
> '/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2'
>  (valid JAR)
> 2023-07-06 10:23:50,522 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Received resource requirements from job 
> a7d36f3881f943a2: 
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
> numberOfRequiredSlots=4}]
> 2023-07-06 10:23:50,522 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Received resource requirements from job 
> a7d36f3881f943a2: 
> 

[jira] [Commented] (FLINK-32552) Mixed up Flink session job deployments

2023-07-12 Thread Fabio Wanner (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742453#comment-17742453
 ] 

Fabio Wanner commented on FLINK-32552:
--

I dug a bit deeper and found the following:

Anytime when the error happens there is one job deployment with this exception:

{code:java}
org.apache.flink.runtime.rest.handler.RestHandlerException: No jobs included in 
application.{code}

This job will be deployed again by the operator and will run without any 
problems. 

But one of the other jobs deployed in parallel will get mixed up with the 
failed one and will not get running (using jar file of the failed one leading 
to classNotFound or same jobId and/or same name)...

> Mixed up Flink session job deployments
> --
>
> Key: FLINK-32552
> URL: https://issues.apache.org/jira/browse/FLINK-32552
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Fabio Wanner
>Priority: Major
>
> *Context*
> In the scope of end-to-end tests we deploy all the Flink session jobs we have 
> regularly in a staging environment. Some of the jobs are bundled together in 
> one helm chart and therefore deployed at the same time. There are around 40 
> individual Flink jobs (running on the same Flink session cluster). The 
> session cluster is individual for each e2e test run. The problems described 
> below happen scarcely (1 in ~ 50 run maybe).
> *Problem*
> Rarely the operator seems to "mix up" the deployments. This can be seen in 
> the Flink cluster logs as multiple {{Received JobGraph submission ' NAME>' ()}} logs are created from jobs with the same job_id. This 
> results in errors such as:
> {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}}
> It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName 
> does not match the expected job name of the job being deployed (The job name 
> is passed to the application via argument).
> So far we were unable to reliably reproduce the error.
> *Details*
> The following lines show the status of 3 jobs form the view point of the 
> Flink cluster dashboard, and the FlinkSessionJob ressource:
>  
> *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: Restarting
>  * ID: a7d36f3881f943a2
>  * Exceptions: Cannot load user class: 
> aelps.pipelines.aletsch.smc.SMCUrlMapper
> FlinkSessionJob Ressource:
>  * State: RUNNING
>  * jobId: a1221c743367497b0002
>  * uid: a1221c74-3367-497b-ad2f-8793ab23919d
>  
> *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: -
>  * ID: -
> FlinkSessionJob Ressource:
>  * State: UPGRADING
>  * jobId: -
>  * uid: a7d36f38-81f9-43a0-898f-19b950430e9d
> Flink K8s Operator:
>  * Exceptions: DuplicateJobSubmissionException: Job has already been 
> submitted.
>  
> *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: Running
>  * ID: e692c2dfaa18441c0002
>  * Exceptions: -
> FlinkSessionJob Ressource:
>  * State: RUNNING
>  * jobId: e692c2dfaa18441c0002
>  * uid: e692c2df-aa18-441c-a352-88aefa9a3017
> As we can see the *aletsch_smc* job is presumably running according to the 
> FlinkSessionJob resource, but crash-looping in the cluster and it has the 
> jobID matching the uid of the resource of {*}aletsch_mat{*}. While 
> *aletsch_mat* is not even running. The following logs also show some 
> suspicious entries: There are several {{Received JobGraph submission}} from 
> different jobs with the same jobID.
>  
> *Logs*
> The logs are filtered by the 3 jobIds from above.
>  
> JobID: a7d36f3881f943a2
> {code:bash}
> Flink Cluster
> ...
> 023-07-06 10:23:50,552 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
> aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 
> (a7d36f3881f943a2) switched from state RUNNING to RESTARTING.
> 2023-07-06 10:23:50   file: 
> '/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2'
>  (valid JAR)
> 2023-07-06 10:23:50,522 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Received resource requirements from job 
> a7d36f3881f943a2: 
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
> numberOfRequiredSlots=4}]
> 2023-07-06 10:23:50,522 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Received resource requirements from job 
> a7d36f3881f943a2: 
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
> numberOfRequiredSlots=3}]
> 2023-07-06 10:23:50,522 INFO 

[jira] [Updated] (FLINK-32552) Mixed up Flink session job deployments

2023-07-10 Thread Fabio Wanner (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabio Wanner updated FLINK-32552:
-
Description: 
*Context*

In the scope of end-to-end tests we deploy all the Flink session jobs we have 
regularly in a staging environment. Some of the jobs are bundled together in 
one helm chart and therefore deployed at the same time. There are around 40 
individual Flink jobs (running on the same Flink session cluster). The session 
cluster is individual for each e2e test run. The problems described below 
happen scarcely (1 in ~ 50 run maybe).

*Problem*

Rarely the operator seems to "mix up" the deployments. This can be seen in the 
Flink cluster logs as multiple {{Received JobGraph submission '' 
()}} logs are created from jobs with the same job_id. This results in 
errors such as:

{{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}}

It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName does 
not match the expected job name of the job being deployed (The job name is 
passed to the application via argument).

So far we were unable to reliably reproduce the error.

*Details*

The following lines show the status of 3 jobs form the view point of the Flink 
cluster dashboard, and the FlinkSessionJob ressource:

 

*aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615*

Apache Flink Dashboard:
 * State: Restarting
 * ID: a7d36f3881f943a2
 * Exceptions: Cannot load user class: aelps.pipelines.aletsch.smc.SMCUrlMapper

FlinkSessionJob Ressource:
 * State: RUNNING
 * jobId: a1221c743367497b0002
 * uid: a1221c74-3367-497b-ad2f-8793ab23919d

 

*aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615*

Apache Flink Dashboard:
 * State: -
 * ID: -

FlinkSessionJob Ressource:
 * State: UPGRADING
 * jobId: -
 * uid: a7d36f38-81f9-43a0-898f-19b950430e9d

Flink K8s Operator:
 * Exceptions: DuplicateJobSubmissionException: Job has already been submitted.

 

*aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615*

Apache Flink Dashboard:
 * State: Running
 * ID: e692c2dfaa18441c0002
 * Exceptions: -

FlinkSessionJob Ressource:
 * State: RUNNING
 * jobId: e692c2dfaa18441c0002
 * uid: e692c2df-aa18-441c-a352-88aefa9a3017

As we can see the *aletsch_smc* job is presumably running according to the 
FlinkSessionJob resource, but crash-looping in the cluster and it has the jobID 
matching the uid of the resource of {*}aletsch_mat{*}. While *aletsch_mat* is 
not even running. The following logs also show some suspicious entries: There 
are several {{Received JobGraph submission}} from different jobs with the same 
jobID.

 

*Logs*

The logs are filtered by the 3 jobIds from above.

 

JobID: a7d36f3881f943a2
{code:bash}
Flink Cluster
...
023-07-06 10:23:50,552 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 
(a7d36f3881f943a2) switched from state RUNNING to RESTARTING.
2023-07-06 10:23:50 file: 
'/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2'
 (valid JAR)
2023-07-06 10:23:50,522 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job a7d36f3881f943a2: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=4}]
2023-07-06 10:23:50,522 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job a7d36f3881f943a2: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=3}]
2023-07-06 10:23:50,522 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job a7d36f3881f943a2: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=2}]
2023-07-06 10:23:50,522 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job a7d36f3881f943a2: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=1}]
2023-07-06 10:23:50,512 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 
(a7d36f3881f943a2) switched from state RESTARTING to RUNNING.
2023-07-06 10:23:48,979 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Clearing resource requirements of job a7d36f3881f943a2
2023-07-06 10:23:48,853 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 

[jira] [Commented] (FLINK-32552) Mixed up Flink session job deployments

2023-07-06 Thread Fabio Wanner (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740873#comment-17740873
 ] 

Fabio Wanner commented on FLINK-32552:
--

I would very much appreciate some pointers on where this issue could origin 
from. So far I was unable to find the root cause (or reproduce it reliably). 
Ahh and we are running the operator on version 1.5 with this fix included: 
https://issues.apache.org/jira/browse/FLINK-32412

> Mixed up Flink session job deployments
> --
>
> Key: FLINK-32552
> URL: https://issues.apache.org/jira/browse/FLINK-32552
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Fabio Wanner
>Priority: Major
>
> *Context*
> In the scope of end-to-end tests we deploy all the Flink session jobs we have 
> regularly in a staging environment. Some of the jobs are bundled together in 
> one helm chart and therefore deployed at the same time. There are around 40 
> individual Flink jobs (running on the same Flink session cluster). The 
> session cluster is individual for each e2e test run. The problems described 
> below happen scarcely (1 in ~ 50 run maybe).
> *Problem*
> Rarely the operator seems to "mix up" the deployments. This can be seen in 
> the Flink cluster logs as multiple {{Received JobGraph submission ' NAME>' ()}} logs are created from jobs with the same job_id. This 
> results in errors such as:
> {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}}
> It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName 
> does not match the expected job name of the job being deployed (The job name 
> is passed to the application via argument).
> So far we were unable to reliably reproduce the error.
> *Details*
> The following lines show the status of 3 jobs form the view point of the 
> Flink cluster dashboard, and the FlinkSessionJob ressource:
>  
> *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: Restarting
>  * ID: a7d36f3881f943a2
>  * Exceptions: Cannot load user class: 
> aelps.pipelines.aletsch.smc.SMCUrlMapper
> FlinkSessionJob Ressource:
>  * State: RUNNING
>  * jobId: a1221c743367497b0002
>  * uid: a1221c74-3367-497b-ad2f-8793ab23919d
>  
> *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: -
>  * ID: -
> FlinkSessionJob Ressource:
>  * State: UPGRADING
>  * jobId: -
>  * uid: a7d36f38-81f9-43a0-898f-19b950430e9d
> Flink K8s Operator:
>  * Exceptions: DuplicateJobSubmissionException: Job has already been 
> submitted.
>  
> *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615*
> Apache Flink Dashboard:
>  * State: Running
>  * ID: e692c2dfaa18441c0002
>  * Exceptions: -
> FlinkSessionJob Ressource:
>  * State: RUNNING
>  * jobId: e692c2dfaa18441c0002
>  * uid: e692c2df-aa18-441c-a352-88aefa9a3017
> As we can see the *aletsch_smc* job is presumably running according to the 
> FlinkSessionJob resource, but crash-looping in the cluster and it has the 
> jobID matching the uid of the resource of {*}aletsch_mat{*}. While 
> *aletsch_mat* is not even running. The following logs also show some 
> suspicious entries: There are several {{Received JobGraph submission}} from 
> different jobs with the same jobID.
>  
> *Logs*
> The logs are filtered by the 3 jobIds from above.
>  
> JobID: a7d36f3881f943a2
> {code:bash}
> Flink Cluster
> ...
> 023-07-06 10:23:50,552 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
> aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 
> (a7d36f3881f943a2) switched from state RUNNING to RESTARTING.
> 2023-07-06 10:23:50   file: 
> '/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2'
>  (valid JAR)
> 2023-07-06 10:23:50,522 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Received resource requirements from job 
> a7d36f3881f943a2: 
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
> numberOfRequiredSlots=4}]
> 2023-07-06 10:23:50,522 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Received resource requirements from job 
> a7d36f3881f943a2: 
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
> numberOfRequiredSlots=3}]
> 2023-07-06 10:23:50,522 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Received resource requirements from job 
> a7d36f3881f943a2: 
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
> 

[jira] [Updated] (FLINK-32552) Mixed up Flink session job deployments

2023-07-06 Thread Fabio Wanner (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabio Wanner updated FLINK-32552:
-
Description: 
*Context*

In the scope of end-to-end tests we deploy all the Flink session jobs we have 
regularly in a staging environment. Some of the jobs are bundled together in 
one helm chart and therefore deployed at the same time. There are around 40 
individual Flink jobs (running on the same Flink session cluster). The session 
cluster is individual for each e2e test run. The problems described below 
happen scarcely (1 in ~ 50 run maybe).

*Problem*

Rarely the operator seems to "mix up" the deployments. This can be seen in the 
Flink cluster logs as multiple {{Received JobGraph submission '' 
()}} logs are created from jobs with the same job_id. This results in 
errors such as:

{{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}}

It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName does 
not match the expected job name of the job being deployed (The job name is 
passed to the application via argument).

So far we were unable to reliably reproduce the error.

*Details*

The following lines show the status of 3 jobs form the view point of the Flink 
cluster dashboard, and the FlinkSessionJob ressource:

 

*aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615*

Apache Flink Dashboard:
 * State: Restarting
 * ID: a7d36f3881f943a2
 * Exceptions: Cannot load user class: aelps.pipelines.aletsch.smc.SMCUrlMapper

FlinkSessionJob Ressource:
 * State: RUNNING
 * jobId: a1221c743367497b0002
 * uid: a1221c74-3367-497b-ad2f-8793ab23919d

 

*aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615*

Apache Flink Dashboard:
 * State: -
 * ID: -

FlinkSessionJob Ressource:
 * State: UPGRADING
 * jobId: -
 * uid: a7d36f38-81f9-43a0-898f-19b950430e9d

Flink K8s Operator:
 * Exceptions: DuplicateJobSubmissionException: Job has already been submitted.

 

*aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615*

Apache Flink Dashboard:
 * State: Running
 * ID: e692c2dfaa18441c0002
 * Exceptions: -

FlinkSessionJob Ressource:
 * State: RUNNING
 * jobId: e692c2dfaa18441c0002
 * uid: e692c2df-aa18-441c-a352-88aefa9a3017

As we can see the *aletsch_smc* job is presumably running according to the 
FlinkSessionJob resource, but crash-looping in the cluster and it has the jobID 
matching the uid of the resource of {*}aletsch_mat{*}. While *aletsch_mat* is 
not even running. The following logs also show some suspicious entries: There 
are several {{Received JobGraph submission}} from different jobs with the same 
jobID.

 

*Logs*

The logs are filtered by the 3 jobIds from above.

 

JobID: a7d36f3881f943a2
{code:bash}
Flink Cluster
...
023-07-06 10:23:50,552 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 
(a7d36f3881f943a2) switched from state RUNNING to RESTARTING.
2023-07-06 10:23:50 file: 
'/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2'
 (valid JAR)
2023-07-06 10:23:50,522 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job a7d36f3881f943a2: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=4}]
2023-07-06 10:23:50,522 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job a7d36f3881f943a2: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=3}]
2023-07-06 10:23:50,522 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job a7d36f3881f943a2: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=2}]
2023-07-06 10:23:50,522 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job a7d36f3881f943a2: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=1}]
2023-07-06 10:23:50,512 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 
(a7d36f3881f943a2) switched from state RESTARTING to RUNNING.
2023-07-06 10:23:48,979 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Clearing resource requirements of job a7d36f3881f943a2
2023-07-06 10:23:48,853 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 

[jira] [Created] (FLINK-32552) Mixed up Flink session job deployments

2023-07-06 Thread Fabio Wanner (Jira)
Fabio Wanner created FLINK-32552:


 Summary: Mixed up Flink session job deployments
 Key: FLINK-32552
 URL: https://issues.apache.org/jira/browse/FLINK-32552
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Reporter: Fabio Wanner


*Context*

In the scope of end-to-end tests we deploy all the Flink session jobs we have 
regularly in a staging environment. Some of the jobs are bundled together in 
one helm chart and therefore deployed at the same time. There are around 40 
individual Flink jobs (running on the same Flink session cluster). The session 
cluster is individual for each e2e test run. The problems described below 
happen scarcely (1 in ~ 50 run maybe).

*Problem*

Rarely the operator seems to "mix up" the deployments. This can be seen in the 
Flink cluster logs as multiple {{Received JobGraph submission '' 
()}} logs are created from jobs with the same job_id. This results in 
errors such as:

{{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}}

It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName does 
not match the expected job name of the job being deployed (The job name is 
passed to the application via argument).

So far we were unable to reliably reproduce the error.

*Details*

The following lines show the status of 3 jobs form the view point of the Flink 
cluster dashboard, and the FlinkSessionJob ressource:

*aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615*

Apache Flink Dashboard:
 * State: Restarting
 * ID: a7d36f3881f943a2
 * Exceptions: Cannot load user class: aelps.pipelines.aletsch.smc.SMCUrlMapper

FlinkSessionJob Ressource:
 * State: RUNNING
 * jobId: a1221c743367497b0002
 * uid: a1221c74-3367-497b-ad2f-8793ab23919d

*aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615*

Apache Flink Dashboard:
 * State: -
 * ID: -

FlinkSessionJob Ressource:
 * State: UPGRADING
 * jobId: -
 * uid: a7d36f38-81f9-43a0-898f-19b950430e9d

Flink K8s Operator:
 * Exceptions: DuplicateJobSubmissionException: Job has already been submitted.

*aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615*

Apache Flink Dashboard:
 * State: Running
 * ID: e692c2dfaa18441c0002
 * Exceptions: -

FlinkSessionJob Ressource:
 * State: RUNNING
 * jobId: e692c2dfaa18441c0002
 * uid: e692c2df-aa18-441c-a352-88aefa9a3017

As we can see the *aletsch_smc* job is presumably running according to the 
FlinkSessionJob resource, but crash-looping in the cluster and it has the jobID 
matching the uid of the resource of {*}aletsch_mat{*}. While *aletsch_mat* is 
not even running. The following logs also show some suspicious entries: There 
are several {{Received JobGraph submission}} from different jobs with the same 
jobID.

*Logs*

The logs are filtered by the ** 3 jobIds from above.

 

JobID: a7d36f3881f943a2
{code:bash}
Flink Cluster
...
023-07-06 10:23:50,552 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 
(a7d36f3881f943a2) switched from state RUNNING to RESTARTING.
2023-07-06 10:23:50 file: 
'/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2'
 (valid JAR)
2023-07-06 10:23:50,522 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job a7d36f3881f943a2: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=4}]
2023-07-06 10:23:50,522 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job a7d36f3881f943a2: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=3}]
2023-07-06 10:23:50,522 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job a7d36f3881f943a2: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=2}]
2023-07-06 10:23:50,522 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job a7d36f3881f943a2: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=1}]
2023-07-06 10:23:50,512 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 
(a7d36f3881f943a2) switched from state RESTARTING to RUNNING.
2023-07-06 10:23:48,979 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Clearing resource requirements of job 

[jira] [Commented] (FLINK-32412) JobID collisions in FlinkSessionJob

2023-06-22 Thread Fabio Wanner (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17736059#comment-17736059
 ] 

Fabio Wanner commented on FLINK-32412:
--

Ohh that's a very good point, I did not think about! I will try this out to be 
extra sure and then open a PR with the suggested solution! Thanks for the fast 
reply!

> JobID collisions in FlinkSessionJob
> ---
>
> Key: FLINK-32412
> URL: https://issues.apache.org/jira/browse/FLINK-32412
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: Fabio Wanner
>Priority: Major
>
> From time to time we see {{JobId}} collisions in our deployments due to the 
> low entropy of the generated {{{}JobId{}}}. The problem is that, although the 
> {{uid}} from the k8s-resource (which is a UUID V4), only the {{hashCode}} of 
> it will be used for the {{{}JobId{}}}. The {{hashCode}} is an integer, thus 
> 32 bits. If we look at the birthday problem theorem we can expect a collision 
> with a 50% chance with only 77000 random integers. 
> In reality we seem to see the problem more often, but this could be because 
> the {{uid}} might not be completely random, therefore increasing the chances 
> if we just use parts of it.
> We propose to at least use the complete 64 bits of the upper part of the 
> {{{}JobId{}}}, where 5.1×10{^}9{^} IDs are needed for a collision chance of 
> 50%. We could even argue that most probably 64 bit for the generation number 
> is not needed and another 32 bit could be spent on the uid to increase the 
> entropy of the {{JobId}} even further (This would mean the max generation 
> would be 4,294,967,295).
> Our suggestion for using 64 bits would be:
> {code:java}
> new JobID(
> 
> UUID.fromString(Preconditions.checkNotNull(uid)).getMostSignificantBits(), 
> Preconditions.checkNotNull(generation)
> );
> {code}
> Any thoughts on this? I would create a PR once we know how to proceed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32412) JobID collisions in FlinkSessionJob

2023-06-22 Thread Fabio Wanner (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabio Wanner updated FLINK-32412:
-
Description: 
>From time to time we see {{JobId}} collisions in our deployments due to the 
>low entropy of the generated {{{}JobId{}}}. The problem is that, although the 
>{{uid}} from the k8s-resource (which is a UUID V4), only the {{hashCode}} of 
>it will be used for the {{{}JobId{}}}. The {{hashCode}} is an integer, thus 32 
>bits. If we look at the birthday problem theorem we can expect a collision 
>with a 50% chance with only 77000 random integers. 

In reality we seem to see the problem more often, but this could be because the 
{{uid}} might not be completely random, therefore increasing the chances if we 
just use parts of it.

We propose to at least use the complete 64 bits of the upper part of the 
{{{}JobId{}}}, where 5.1×10{^}9{^} IDs are needed for a collision chance of 
50%. We could even argue that most probably 64 bit for the generation number is 
not needed and another 32 bit could be spent on the uid to increase the entropy 
of the {{JobId}} even further (This would mean the max generation would be 
4,294,967,295).

Our suggestion for using 64 bits would be:
{code:java}
new JobID(
UUID.fromString(Preconditions.checkNotNull(uid)).getMostSignificantBits(), 
Preconditions.checkNotNull(generation)
);
{code}
Any thoughts on this? I would create a PR once we know how to proceed.

  was:
>From time to time we see {{JobId}} collisions in our deployments due to the 
>low entropy of the generated {{{}JobId{}}}. The problem is that, although the 
>{{uid}} from the k8s-resource (which is a UUID, but we don't know of which 
>version), only the {{hashCode}} of it will be used for the {{{}JobId{}}}. The 
>{{hashCode}} is an integer, thus 32 bits. If we look at the birthday problem 
>theorem we can expect a collision with a 50% chance with only 77000 random 
>integers. 

In reality we seem to see the problem more often, but this could be because the 
{{uid}} might not be completely random, therefore increasing the chances if we 
just use parts of it.

We propose to at least use the complete 64 bits of the upper part of the 
{{{}JobId{}}}, where 5.1×10{^}9{^} IDs are needed for a collision chance of 
50%. We could even argue that most probably 64 bit for the generation number is 
not needed and another 32 bit could be spent on the uid to increase the entropy 
of the {{JobId}} even further (This would mean the max generation would be 
4,294,967,295).

Our suggestion for using 64 bits would be:
{code:java}
new JobID(
UUID.fromString(Preconditions.checkNotNull(uid)).getMostSignificantBits(), 
Preconditions.checkNotNull(generation)
);
{code}
Any thoughts on this? I would create a PR once we know how to proceed.


> JobID collisions in FlinkSessionJob
> ---
>
> Key: FLINK-32412
> URL: https://issues.apache.org/jira/browse/FLINK-32412
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: Fabio Wanner
>Priority: Major
>
> From time to time we see {{JobId}} collisions in our deployments due to the 
> low entropy of the generated {{{}JobId{}}}. The problem is that, although the 
> {{uid}} from the k8s-resource (which is a UUID V4), only the {{hashCode}} of 
> it will be used for the {{{}JobId{}}}. The {{hashCode}} is an integer, thus 
> 32 bits. If we look at the birthday problem theorem we can expect a collision 
> with a 50% chance with only 77000 random integers. 
> In reality we seem to see the problem more often, but this could be because 
> the {{uid}} might not be completely random, therefore increasing the chances 
> if we just use parts of it.
> We propose to at least use the complete 64 bits of the upper part of the 
> {{{}JobId{}}}, where 5.1×10{^}9{^} IDs are needed for a collision chance of 
> 50%. We could even argue that most probably 64 bit for the generation number 
> is not needed and another 32 bit could be spent on the uid to increase the 
> entropy of the {{JobId}} even further (This would mean the max generation 
> would be 4,294,967,295).
> Our suggestion for using 64 bits would be:
> {code:java}
> new JobID(
> 
> UUID.fromString(Preconditions.checkNotNull(uid)).getMostSignificantBits(), 
> Preconditions.checkNotNull(generation)
> );
> {code}
> Any thoughts on this? I would create a PR once we know how to proceed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32412) JobID collisions in FlinkSessionJob

2023-06-22 Thread Fabio Wanner (Jira)
Fabio Wanner created FLINK-32412:


 Summary: JobID collisions in FlinkSessionJob
 Key: FLINK-32412
 URL: https://issues.apache.org/jira/browse/FLINK-32412
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0
Reporter: Fabio Wanner


>From time to time we see {{JobId}} collisions in our deployments due to the 
>low entropy of the generated {{{}JobId{}}}. The problem is that, although the 
>{{uid}} from the k8s-resource (which is a UUID, but we don't know of which 
>version), only the {{hashCode}} of it will be used for the {{{}JobId{}}}. The 
>{{hashCode}} is an integer, thus 32 bits. If we look at the birthday problem 
>theorem we can expect a collision with a 50% chance with only 77000 random 
>integers. 

In reality we seem to see the problem more often, but this could be because the 
{{uid}} might not be completely random, therefore increasing the chances if we 
just use parts of it.

We propose to at least use the complete 64 bits of the upper part of the 
{{{}JobId{}}}, where 5.1×10{^}9{^} IDs are needed for a collision chance of 
50%. We could even argue that most probably 64 bit for the generation number is 
not needed and another 32 bit could be spent on the uid to increase the entropy 
of the {{JobId}} even further (This would mean the max generation would be 
4,294,967,295).

Our suggestion for using 64 bits would be:
{code:java}
new JobID(
UUID.fromString(Preconditions.checkNotNull(uid)).getMostSignificantBits(), 
Preconditions.checkNotNull(generation)
);
{code}
Any thoughts on this? I would create a PR once we know how to proceed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30845) Params in jarURI end up in file name

2023-02-02 Thread Fabio Wanner (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabio Wanner updated FLINK-30845:
-
External issue URL:   (was: 
https://github.com/apache/flink-kubernetes-operator/pull/522)

> Params in jarURI end up in file name
> 
>
> Key: FLINK-30845
> URL: https://issues.apache.org/jira/browse/FLINK-30845
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.0
>Reporter: Fabio Wanner
>Priority: Minor
>  Labels: pull-request-available, starter
>
> *Context*
> Jar files for jobs are submitted to the operator by supplying a URI to the 
> .jar file. This URI can be a file system path or a URI to some HTTP resource. 
> If a HTTP URI is given, the file will be fetched using the 
> {{{}HttpArtifactFetcher{}}}. 
> There are cases where the supplied URI will contain additional params. For 
> example if pre-signed S3 URLs are used.
> Example:
> {code:java}
> https://some-domain.example.com/some.jar?some=params{code}
> *Problem*
> When the HttpArtifactFetcher determines the name of the .jar file it does 
> also use the params as part of the file name. In the example from above the 
> resulting file name would be:  {{some.jar?some=params}}
> Submitting this job to Flink will result in an error as it will be checked 
> for the file name to end with {{.jar}}
> *Possible Solution*
> In the {{HttpArtifactFetcher}} it would be enough to replace:
> {code:java}
> String fileName = FilenameUtils.getName(url.getFile());{code}
> with
> {code:java}
> String fileName = FilenameUtils.getName(url.getPath());{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30845) Params in jarURI end up in file name

2023-02-02 Thread Fabio Wanner (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabio Wanner updated FLINK-30845:
-
External issue URL: 
https://github.com/apache/flink-kubernetes-operator/pull/522
Labels: pull-request-available starter  (was: starter)

> Params in jarURI end up in file name
> 
>
> Key: FLINK-30845
> URL: https://issues.apache.org/jira/browse/FLINK-30845
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.0
>Reporter: Fabio Wanner
>Priority: Minor
>  Labels: pull-request-available, starter
>
> *Context*
> Jar files for jobs are submitted to the operator by supplying a URI to the 
> .jar file. This URI can be a file system path or a URI to some HTTP resource. 
> If a HTTP URI is given, the file will be fetched using the 
> {{{}HttpArtifactFetcher{}}}. 
> There are cases where the supplied URI will contain additional params. For 
> example if pre-signed S3 URLs are used.
> Example:
> {code:java}
> https://some-domain.example.com/some.jar?some=params{code}
> *Problem*
> When the HttpArtifactFetcher determines the name of the .jar file it does 
> also use the params as part of the file name. In the example from above the 
> resulting file name would be:  {{some.jar?some=params}}
> Submitting this job to Flink will result in an error as it will be checked 
> for the file name to end with {{.jar}}
> *Possible Solution*
> In the {{HttpArtifactFetcher}} it would be enough to replace:
> {code:java}
> String fileName = FilenameUtils.getName(url.getFile());{code}
> with
> {code:java}
> String fileName = FilenameUtils.getName(url.getPath());{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30851) flink-kubernetes-operator - operator image should provide s3 support for jarUri

2023-01-31 Thread Fabio Wanner (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682571#comment-17682571
 ] 

Fabio Wanner commented on FLINK-30851:
--

Thanks a lot for opening this ticket! We did not think about using the S3 
plugin but went with using presigned S3 URLs instead but ran into this problem: 
FLINK-30845. But of course if you only have one S3 endpoint the plugin makes 
much sense!

> flink-kubernetes-operator - operator image should provide s3 support for 
> jarUri
> ---
>
> Key: FLINK-30851
> URL: https://issues.apache.org/jira/browse/FLINK-30851
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.1
>Reporter: Vincent Chenal
>Priority: Minor
>
> The docker image does not embed filesystem plugins.
> Neither in plugins/ neither in opt/ folders.
> I'd like to use jarUri this way:
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkSessionJob
> metadata:
>   name: myjob
> spec:
>   deploymentName: mydeployment
>   job:
> jarURI: s3://somebucket/myjar.jar {code}
> But I'm getting this error:
> {code:java}
> Could not find a file system implementation for scheme 's3'. The scheme is 
> directly supported by Flink through the following plugins: 
> flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin 
> resides within its own subfolder within the plugins directory. See 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for 
> more information. If you want to use a Hadoop file system for that scheme, 
> please add the scheme to the configuration fs.allowed-fallback-filesystems. 
> For a full list of supported file systems, please see 
> https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/. {code}
> I was able to make it work by putting flink 1.16 s3-presto-fs jar within the 
> flink-kubernetes-operator image but it would make sense to have it natively.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30845) Params in jarURI end up in file name

2023-01-30 Thread Fabio Wanner (Jira)
Fabio Wanner created FLINK-30845:


 Summary: Params in jarURI end up in file name
 Key: FLINK-30845
 URL: https://issues.apache.org/jira/browse/FLINK-30845
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.3.0
Reporter: Fabio Wanner


*Context*

Jar files for jobs are submitted to the operator by supplying a URI to the .jar 
file. This URI can be a file system path or a URI to some HTTP resource. If a 
HTTP URI is given, the file will be fetched using the 
{{{}HttpArtifactFetcher{}}}. 

There are cases where the supplied URI will contain additional params. For 
example if pre-signed S3 URLs are used.

Example:
{code:java}
https://some-domain.example.com/some.jar?some=params{code}
*Problem*

When the HttpArtifactFetcher determines the name of the .jar file it does also 
use the params as part of the file name. In the example from above the 
resulting file name would be:  {{some.jar?some=params}}

Submitting this job to Flink will result in an error as it will be checked for 
the file name to end with {{.jar}}

*Possible Solution*
In the {{HttpArtifactFetcher}} it would be enough to replace:
{code:java}
String fileName = FilenameUtils.getName(url.getFile());{code}
with
{code:java}
String fileName = FilenameUtils.getName(url.getPath());{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)