[jira] [Commented] (FLINK-32592) (Stream)ExEnv#initializeContextEnvironment isn't thread-safe
[ https://issues.apache.org/jira/browse/FLINK-32592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17745668#comment-17745668 ] Fabio Wanner commented on FLINK-32592: -- We did a few hundred deployments and did not run into the issue anymore! Looks resolved! Thanks a lot! > (Stream)ExEnv#initializeContextEnvironment isn't thread-safe > > > Key: FLINK-32592 > URL: https://issues.apache.org/jira/browse/FLINK-32592 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.15.4, 1.18.0, 1.17.1 >Reporter: Fabio Wanner >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0, 1.16.3, 1.17.2 > > > *Context* > We are using the flink-k8s-operator to deploy multiple jobs (up to 32) to a > single session cluster. The job submissions done by the operator happen > concurrently, basically at the same time. > Operator version: 1.5.0 > Flink version: 1.15.4, 1.7.1, 1.18 (master@f37d41cf) > *Problem* > Rarely (~once every 50 deployments) one of the jobs will not be executed. In > the following incident 4 jobs are deployed at the same time: > * gorner-task-staging-e5730831 > * gorner-facility-staging-e5730831 > * gorner-aepp-staging-e5730831 > * gorner-session-staging-e5730831 > > The operator submits the job, they all get a reasonable jobID: > {code:java} > 2023-07-14 10:25:35,295 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-task-staging-e5730831] Submitting job: > 4968b186061e44390002 to session cluster. > 2023-07-14 10:25:35,297 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-facility-staging-e5730831] Submitting job: > 91a5260d916c4dff0002 to session cluster. > 2023-07-14 10:25:35,301 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-aepp-staging-e5730831] Submitting job: > 103c0446e14749a10002 to session cluster. > 2023-07-14 10:25:35,302 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-session-staging-e5730831] Submitting job: > de59304d370b4b8e0002 to session cluster. > {code} > In the cluster the JarRunHandler's handleRequest() method will get the > request, all 4 jobIDs are present (also all args, etc are correct): > {code:java} > 2023-07-14 10:25:35,320 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: 4968b186061e44390002 > 2023-07-14 10:25:35,321 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: de59304d370b4b8e0002 > 2023-07-14 10:25:35,321 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: 91a5260d916c4dff0002 > 2023-07-14 10:25:35,321 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: 103c0446e14749a10002 > {code} > But once the EmbeddedExecutor's submitAndGetJobClientFuture() method is > called instead of getting 1 call per jobID we have 4 calls but one of the > jobIDs twice: > {code:java} > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[4968b186061e44390002] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[103c0446e14749a10002] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[de59304d370b4b8e0002] > 2023-07-14 10:25:35,721 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[de59304d370b4b8e0002] > {code} > If this is important: the jobGraph obtained does not match the jobID. We get > 2 times de59304d370b4b8e0002 but the jobgraph for this jobID is > never returned by getJobGraph() in > EmbeddedExecutor.submitAndGetJobClientFuture(). > This will then lead to the job already existing: > {code:java} > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - submittedJobIds: [] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - submittedJobIds: [] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - submittedJobIds: [] > 2023-07-14 10:25:35,721 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] >
[jira] [Resolved] (FLINK-32552) Mixed up Flink session job deployments
[ https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabio Wanner resolved FLINK-32552. -- Release Note: Not a bug of the flink k8s operator. Resolution: Not A Bug > Mixed up Flink session job deployments > -- > > Key: FLINK-32552 > URL: https://issues.apache.org/jira/browse/FLINK-32552 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Fabio Wanner >Priority: Major > > *Context* > In the scope of end-to-end tests we deploy all the Flink session jobs we have > regularly in a staging environment. Some of the jobs are bundled together in > one helm chart and therefore deployed at the same time. There are around 40 > individual Flink jobs (running on the same Flink session cluster). The > session cluster is individual for each e2e test run. The problems described > below happen scarcely (1 in ~ 50 run maybe). > *Problem* > Rarely the operator seems to "mix up" the deployments. This can be seen in > the Flink cluster logs as multiple {{Received JobGraph submission ' NAME>' ()}} logs are created from jobs with the same job_id. This > results in errors such as: > {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}} > It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName > does not match the expected job name of the job being deployed (The job name > is passed to the application via argument). > So far we were unable to reliably reproduce the error. > *Details* > The following lines show the status of 3 jobs form the view point of the > Flink cluster dashboard, and the FlinkSessionJob ressource: > > *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: Restarting > * ID: a7d36f3881f943a2 > * Exceptions: Cannot load user class: > aelps.pipelines.aletsch.smc.SMCUrlMapper > FlinkSessionJob Ressource: > * State: RUNNING > * jobId: a1221c743367497b0002 > * uid: a1221c74-3367-497b-ad2f-8793ab23919d > > *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: - > * ID: - > FlinkSessionJob Ressource: > * State: UPGRADING > * jobId: - > * uid: a7d36f38-81f9-43a0-898f-19b950430e9d > Flink K8s Operator: > * Exceptions: DuplicateJobSubmissionException: Job has already been > submitted. > > *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: Running > * ID: e692c2dfaa18441c0002 > * Exceptions: - > FlinkSessionJob Ressource: > * State: RUNNING > * jobId: e692c2dfaa18441c0002 > * uid: e692c2df-aa18-441c-a352-88aefa9a3017 > As we can see the *aletsch_smc* job is presumably running according to the > FlinkSessionJob resource, but crash-looping in the cluster and it has the > jobID matching the uid of the resource of {*}aletsch_mat{*}. While > *aletsch_mat* is not even running. The following logs also show some > suspicious entries: There are several {{Received JobGraph submission}} from > different jobs with the same jobID. > > *Logs* > The logs are filtered by the 3 jobIds from above. > > JobID: a7d36f3881f943a2 > {code:bash} > Flink Cluster > ... > 023-07-06 10:23:50,552 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 > (a7d36f3881f943a2) switched from state RUNNING to RESTARTING. > 2023-07-06 10:23:50 file: > '/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2' > (valid JAR) > 2023-07-06 10:23:50,522 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Received resource requirements from job > a7d36f3881f943a2: > [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, > numberOfRequiredSlots=4}] > 2023-07-06 10:23:50,522 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Received resource requirements from job > a7d36f3881f943a2: > [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, > numberOfRequiredSlots=3}] > 2023-07-06 10:23:50,522 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Received resource requirements from job > a7d36f3881f943a2: > [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, > numberOfRequiredSlots=2}] > 2023-07-06 10:23:50,522 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Received resource requirements from job > a7d36f3881f943a2: >
[jira] [Commented] (FLINK-32592) (Stream)ExEnv#initializeContextEnvironment isn't thread-safe
[ https://issues.apache.org/jira/browse/FLINK-32592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743495#comment-17743495 ] Fabio Wanner commented on FLINK-32592: -- Hey! Sorry for my late response and thanks a lot for looking into the issue! As the job submission is done by the official flink k8s operator we do not have direct control over how the jobs are run. I am currently on vacation, but I will be able to verify your fix on Wednesday! > (Stream)ExEnv#initializeContextEnvironment isn't thread-safe > > > Key: FLINK-32592 > URL: https://issues.apache.org/jira/browse/FLINK-32592 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.15.4, 1.18.0, 1.17.1 >Reporter: Fabio Wanner >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > > *Context* > We are using the flink-k8s-operator to deploy multiple jobs (up to 32) to a > single session cluster. The job submissions done by the operator happen > concurrently, basically at the same time. > Operator version: 1.5.0 > Flink version: 1.15.4, 1.7.1, 1.18 (master@f37d41cf) > *Problem* > Rarely (~once every 50 deployments) one of the jobs will not be executed. In > the following incident 4 jobs are deployed at the same time: > * gorner-task-staging-e5730831 > * gorner-facility-staging-e5730831 > * gorner-aepp-staging-e5730831 > * gorner-session-staging-e5730831 > > The operator submits the job, they all get a reasonable jobID: > {code:java} > 2023-07-14 10:25:35,295 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-task-staging-e5730831] Submitting job: > 4968b186061e44390002 to session cluster. > 2023-07-14 10:25:35,297 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-facility-staging-e5730831] Submitting job: > 91a5260d916c4dff0002 to session cluster. > 2023-07-14 10:25:35,301 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-aepp-staging-e5730831] Submitting job: > 103c0446e14749a10002 to session cluster. > 2023-07-14 10:25:35,302 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-session-staging-e5730831] Submitting job: > de59304d370b4b8e0002 to session cluster. > {code} > In the cluster the JarRunHandler's handleRequest() method will get the > request, all 4 jobIDs are present (also all args, etc are correct): > {code:java} > 2023-07-14 10:25:35,320 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: 4968b186061e44390002 > 2023-07-14 10:25:35,321 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: de59304d370b4b8e0002 > 2023-07-14 10:25:35,321 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: 91a5260d916c4dff0002 > 2023-07-14 10:25:35,321 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: 103c0446e14749a10002 > {code} > But once the EmbeddedExecutor's submitAndGetJobClientFuture() method is > called instead of getting 1 call per jobID we have 4 calls but one of the > jobIDs twice: > {code:java} > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[4968b186061e44390002] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[103c0446e14749a10002] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[de59304d370b4b8e0002] > 2023-07-14 10:25:35,721 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[de59304d370b4b8e0002] > {code} > If this is important: the jobGraph obtained does not match the jobID. We get > 2 times de59304d370b4b8e0002 but the jobgraph for this jobID is > never returned by getJobGraph() in > EmbeddedExecutor.submitAndGetJobClientFuture(). > This will then lead to the job already existing: > {code:java} > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - submittedJobIds: [] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - submittedJobIds: [] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute -
[jira] [Commented] (FLINK-32592) Mixed-up job execution on concurrent job submission
[ https://issues.apache.org/jira/browse/FLINK-32592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743136#comment-17743136 ] Fabio Wanner commented on FLINK-32592: -- I would be happy to assist in fixing this issue, but could really use some pointers to find the root cause of the problem... > Mixed-up job execution on concurrent job submission > --- > > Key: FLINK-32592 > URL: https://issues.apache.org/jira/browse/FLINK-32592 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.15.4, 1.18.0, 1.17.1 >Reporter: Fabio Wanner >Priority: Major > > *Context* > We are using the flink-k8s-operator to deploy multiple jobs (up to 32) to a > single session cluster. The job submissions done by the operator happen > concurrently, basically at the same time. > Operator version: 1.5.0 > Flink version: 1.15.4, 1.7.1, 1.18 (master@f37d41cf) > *Problem* > Rarely (~once every 50 deployments) one of the jobs will not be executed. In > the following incident 4 jobs are deployed at the same time: > * gorner-task-staging-e5730831 > * gorner-facility-staging-e5730831 > * gorner-aepp-staging-e5730831 > * gorner-session-staging-e5730831 > > The operator submits the job, they all get a reasonable jobID: > {code:java} > 2023-07-14 10:25:35,295 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-task-staging-e5730831] Submitting job: > 4968b186061e44390002 to session cluster. > 2023-07-14 10:25:35,297 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-facility-staging-e5730831] Submitting job: > 91a5260d916c4dff0002 to session cluster. > 2023-07-14 10:25:35,301 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-aepp-staging-e5730831] Submitting job: > 103c0446e14749a10002 to session cluster. > 2023-07-14 10:25:35,302 o.a.f.k.o.s.AbstractFlinkService [INFO > ][aelps-staging/gorner-session-staging-e5730831] Submitting job: > de59304d370b4b8e0002 to session cluster. > {code} > In the cluster the JarRunHandler's handleRequest() method will get the > request, all 4 jobIDs are present (also all args, etc are correct): > {code:java} > 2023-07-14 10:25:35,320 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: 4968b186061e44390002 > 2023-07-14 10:25:35,321 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: de59304d370b4b8e0002 > 2023-07-14 10:25:35,321 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: 91a5260d916c4dff0002 > 2023-07-14 10:25:35,321 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - > handleRequest - requestBody.jobId: 103c0446e14749a10002 > {code} > But once the EmbeddedExecutor's submitAndGetJobClientFuture() method is > called instead of getting 1 call per jobID we have 4 calls but one of the > jobIDs twice: > {code:java} > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[4968b186061e44390002] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[103c0446e14749a10002] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[de59304d370b4b8e0002] > 2023-07-14 10:25:35,721 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - optJobId: Optional[de59304d370b4b8e0002] > {code} > If this is important: the jobGraph obtained does not match the jobID. We get > 2 times de59304d370b4b8e0002 but the jobgraph for this jobID is > never returned by getJobGraph() in > EmbeddedExecutor.submitAndGetJobClientFuture(). > This will then lead to the job already existing: > {code:java} > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - submittedJobIds: [] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - submittedJobIds: [] > 2023-07-14 10:25:35,616 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - submittedJobIds: [] > 2023-07-14 10:25:35,721 WARN > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - execute - submittedJobIds: [de59304d370b4b8e0002] > {code} > But since the jobs are completely different the
[jira] [Commented] (FLINK-32552) Mixed up Flink session job deployments
[ https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743135#comment-17743135 ] Fabio Wanner commented on FLINK-32552: -- BTW: As a really bad workaround the problem can be "fixed" by adding big enough random delays in the AbstractFlinkResourceReconcilers reconcile() method on first deployment... > Mixed up Flink session job deployments > -- > > Key: FLINK-32552 > URL: https://issues.apache.org/jira/browse/FLINK-32552 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Fabio Wanner >Priority: Major > > *Context* > In the scope of end-to-end tests we deploy all the Flink session jobs we have > regularly in a staging environment. Some of the jobs are bundled together in > one helm chart and therefore deployed at the same time. There are around 40 > individual Flink jobs (running on the same Flink session cluster). The > session cluster is individual for each e2e test run. The problems described > below happen scarcely (1 in ~ 50 run maybe). > *Problem* > Rarely the operator seems to "mix up" the deployments. This can be seen in > the Flink cluster logs as multiple {{Received JobGraph submission ' NAME>' ()}} logs are created from jobs with the same job_id. This > results in errors such as: > {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}} > It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName > does not match the expected job name of the job being deployed (The job name > is passed to the application via argument). > So far we were unable to reliably reproduce the error. > *Details* > The following lines show the status of 3 jobs form the view point of the > Flink cluster dashboard, and the FlinkSessionJob ressource: > > *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: Restarting > * ID: a7d36f3881f943a2 > * Exceptions: Cannot load user class: > aelps.pipelines.aletsch.smc.SMCUrlMapper > FlinkSessionJob Ressource: > * State: RUNNING > * jobId: a1221c743367497b0002 > * uid: a1221c74-3367-497b-ad2f-8793ab23919d > > *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: - > * ID: - > FlinkSessionJob Ressource: > * State: UPGRADING > * jobId: - > * uid: a7d36f38-81f9-43a0-898f-19b950430e9d > Flink K8s Operator: > * Exceptions: DuplicateJobSubmissionException: Job has already been > submitted. > > *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: Running > * ID: e692c2dfaa18441c0002 > * Exceptions: - > FlinkSessionJob Ressource: > * State: RUNNING > * jobId: e692c2dfaa18441c0002 > * uid: e692c2df-aa18-441c-a352-88aefa9a3017 > As we can see the *aletsch_smc* job is presumably running according to the > FlinkSessionJob resource, but crash-looping in the cluster and it has the > jobID matching the uid of the resource of {*}aletsch_mat{*}. While > *aletsch_mat* is not even running. The following logs also show some > suspicious entries: There are several {{Received JobGraph submission}} from > different jobs with the same jobID. > > *Logs* > The logs are filtered by the 3 jobIds from above. > > JobID: a7d36f3881f943a2 > {code:bash} > Flink Cluster > ... > 023-07-06 10:23:50,552 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 > (a7d36f3881f943a2) switched from state RUNNING to RESTARTING. > 2023-07-06 10:23:50 file: > '/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2' > (valid JAR) > 2023-07-06 10:23:50,522 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Received resource requirements from job > a7d36f3881f943a2: > [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, > numberOfRequiredSlots=4}] > 2023-07-06 10:23:50,522 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Received resource requirements from job > a7d36f3881f943a2: > [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, > numberOfRequiredSlots=3}] > 2023-07-06 10:23:50,522 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Received resource requirements from job > a7d36f3881f943a2: > [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, > numberOfRequiredSlots=2}] > 2023-07-06 10:23:50,522 INFO >
[jira] [Comment Edited] (FLINK-32552) Mixed up Flink session job deployments
[ https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743096#comment-17743096 ] Fabio Wanner edited comment on FLINK-32552 at 7/14/23 12:27 PM: It's definitely not a bug in the operator: I deployed 4 different jobs at the same time and observed the following: 4 calls are made of the method JarRunHandler.handleRequest() and when looking at the request 4 distinct ids for the 4 jobs are present, but the EmbeddedExecutor's submitAndGetJobClientFuture() method (also called 4 times) will have 3 distinct optJobIds and one duplicate, leading to different flavors of the problem described in this but ticket (depending on the exact timing of the parallel job launches). Btw: Flink versions 1.15.4 & 1.17.1. show this issue. I have opened a new bug ticket with the relevant information: https://issues.apache.org/jira/browse/FLINK-32592 was (Author: JIRAUSER298842): It's definitely not a bug in the operator: I deployed 4 different jobs at the same time and observed the following: 4 calls are made of the method JarRunHandler.handleRequest() and when looking at the request 4 distinct ids for the 4 jobs are present, but the EmbeddedExecutor's submitAndGetJobClientFuture() method (also called 4 times) will have 3 distinct optJobIds and one duplicate, leading to different flavors of the problem described in this but ticket (depending on the exact timing of the parallel job launches). Btw: Flink versions 1.15.4 & 1.17.1. show this issue. I will open a new bug ticket with the relevant information and link it here. > Mixed up Flink session job deployments > -- > > Key: FLINK-32552 > URL: https://issues.apache.org/jira/browse/FLINK-32552 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Fabio Wanner >Priority: Major > > *Context* > In the scope of end-to-end tests we deploy all the Flink session jobs we have > regularly in a staging environment. Some of the jobs are bundled together in > one helm chart and therefore deployed at the same time. There are around 40 > individual Flink jobs (running on the same Flink session cluster). The > session cluster is individual for each e2e test run. The problems described > below happen scarcely (1 in ~ 50 run maybe). > *Problem* > Rarely the operator seems to "mix up" the deployments. This can be seen in > the Flink cluster logs as multiple {{Received JobGraph submission ' NAME>' ()}} logs are created from jobs with the same job_id. This > results in errors such as: > {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}} > It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName > does not match the expected job name of the job being deployed (The job name > is passed to the application via argument). > So far we were unable to reliably reproduce the error. > *Details* > The following lines show the status of 3 jobs form the view point of the > Flink cluster dashboard, and the FlinkSessionJob ressource: > > *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: Restarting > * ID: a7d36f3881f943a2 > * Exceptions: Cannot load user class: > aelps.pipelines.aletsch.smc.SMCUrlMapper > FlinkSessionJob Ressource: > * State: RUNNING > * jobId: a1221c743367497b0002 > * uid: a1221c74-3367-497b-ad2f-8793ab23919d > > *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: - > * ID: - > FlinkSessionJob Ressource: > * State: UPGRADING > * jobId: - > * uid: a7d36f38-81f9-43a0-898f-19b950430e9d > Flink K8s Operator: > * Exceptions: DuplicateJobSubmissionException: Job has already been > submitted. > > *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: Running > * ID: e692c2dfaa18441c0002 > * Exceptions: - > FlinkSessionJob Ressource: > * State: RUNNING > * jobId: e692c2dfaa18441c0002 > * uid: e692c2df-aa18-441c-a352-88aefa9a3017 > As we can see the *aletsch_smc* job is presumably running according to the > FlinkSessionJob resource, but crash-looping in the cluster and it has the > jobID matching the uid of the resource of {*}aletsch_mat{*}. While > *aletsch_mat* is not even running. The following logs also show some > suspicious entries: There are several {{Received JobGraph submission}} from > different jobs with the same jobID. > > *Logs* > The logs are filtered by the 3 jobIds from above. > > JobID: a7d36f3881f943a2 > {code:bash} > Flink Cluster > ... > 023-07-06 10:23:50,552 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 >
[jira] [Created] (FLINK-32592) Mixed-up job execution on concurrent job submission
Fabio Wanner created FLINK-32592: Summary: Mixed-up job execution on concurrent job submission Key: FLINK-32592 URL: https://issues.apache.org/jira/browse/FLINK-32592 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.17.1, 1.15.4, 1.18.0 Reporter: Fabio Wanner *Context* We are using the flink-k8s-operator to deploy multiple jobs (up to 32) to a single session cluster. The job submissions done by the operator happen concurrently, basically at the same time. Operator version: 1.5.0 Flink version: 1.15.4, 1.7.1, 1.18 (master@f37d41cf) *Problem* Rarely (~once every 50 deployments) one of the jobs will not be executed. In the following incident 4 jobs are deployed at the same time: * gorner-task-staging-e5730831 * gorner-facility-staging-e5730831 * gorner-aepp-staging-e5730831 * gorner-session-staging-e5730831 The operator submits the job, they all get a reasonable jobID: {code:java} 2023-07-14 10:25:35,295 o.a.f.k.o.s.AbstractFlinkService [INFO ][aelps-staging/gorner-task-staging-e5730831] Submitting job: 4968b186061e44390002 to session cluster. 2023-07-14 10:25:35,297 o.a.f.k.o.s.AbstractFlinkService [INFO ][aelps-staging/gorner-facility-staging-e5730831] Submitting job: 91a5260d916c4dff0002 to session cluster. 2023-07-14 10:25:35,301 o.a.f.k.o.s.AbstractFlinkService [INFO ][aelps-staging/gorner-aepp-staging-e5730831] Submitting job: 103c0446e14749a10002 to session cluster. 2023-07-14 10:25:35,302 o.a.f.k.o.s.AbstractFlinkService [INFO ][aelps-staging/gorner-session-staging-e5730831] Submitting job: de59304d370b4b8e0002 to session cluster. {code} In the cluster the JarRunHandler's handleRequest() method will get the request, all 4 jobIDs are present (also all args, etc are correct): {code:java} 2023-07-14 10:25:35,320 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - handleRequest - requestBody.jobId: 4968b186061e44390002 2023-07-14 10:25:35,321 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - handleRequest - requestBody.jobId: de59304d370b4b8e0002 2023-07-14 10:25:35,321 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - handleRequest - requestBody.jobId: 91a5260d916c4dff0002 2023-07-14 10:25:35,321 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - handleRequest - requestBody.jobId: 103c0446e14749a10002 {code} But once the EmbeddedExecutor's submitAndGetJobClientFuture() method is called instead of getting 1 call per jobID we have 4 calls but one of the jobIDs twice: {code:java} 2023-07-14 10:25:35,616 WARN org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - optJobId: Optional[4968b186061e44390002] 2023-07-14 10:25:35,616 WARN org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - optJobId: Optional[103c0446e14749a10002] 2023-07-14 10:25:35,616 WARN org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - optJobId: Optional[de59304d370b4b8e0002] 2023-07-14 10:25:35,721 WARN org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - optJobId: Optional[de59304d370b4b8e0002] {code} If this is important: the jobGraph obtained does not match the jobID. We get 2 times de59304d370b4b8e0002 but the jobgraph for this jobID is never returned by getJobGraph() in EmbeddedExecutor.submitAndGetJobClientFuture(). This will then lead to the job already existing: {code:java} 2023-07-14 10:25:35,616 WARN org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - submittedJobIds: [] 2023-07-14 10:25:35,616 WARN org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - submittedJobIds: [] 2023-07-14 10:25:35,616 WARN org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - submittedJobIds: [] 2023-07-14 10:25:35,721 WARN org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - submittedJobIds: [de59304d370b4b8e0002] {code} But since the jobs are completely different the execution will fail. Depending on the timing with one of the following exceptions: * RestHandlerException: No jobs included in application * ClassNotFoundException: io.dectris.aelps.pipelines.gorner.facility.FacilityEventProcessor -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32552) Mixed up Flink session job deployments
[ https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743096#comment-17743096 ] Fabio Wanner edited comment on FLINK-32552 at 7/14/23 10:43 AM: It's definitely not a bug in the operator: I deployed 4 different jobs at the same time and observed the following: 4 calls are made of the method JarRunHandler.handleRequest() and when looking at the request 4 distinct ids for the 4 jobs are present, but the EmbeddedExecutor's submitAndGetJobClientFuture() method (also called 4 times) will have 3 distinct optJobIds and one duplicate, leading to different flavors of the problem described in this but ticket (depending on the exact timing of the parallel job launches). Btw: Flink versions 1.15.4 & 1.17.1. show this issue. I will open a new bug ticket with the relevant information and link it here. was (Author: JIRAUSER298842): It's definitely not a bug in the operator: I deployed 4 different jobs at the same time and observed the following: 4 calls are made of the method JarRunHandler.handleRequest() and when looking at the request 4 distinct ids for the 4 jobs are present, but the EmbeddedExecutor's submitAndGetJobClientFuture() method (also called 4 times) will have 3 distinct optJobIds and one duplicate, leading to different flavors of the problem described in this but ticket (depending on the exact timing of the parallel job launches). I will open a new bug ticket with the relevant information and link it here. > Mixed up Flink session job deployments > -- > > Key: FLINK-32552 > URL: https://issues.apache.org/jira/browse/FLINK-32552 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Fabio Wanner >Priority: Major > > *Context* > In the scope of end-to-end tests we deploy all the Flink session jobs we have > regularly in a staging environment. Some of the jobs are bundled together in > one helm chart and therefore deployed at the same time. There are around 40 > individual Flink jobs (running on the same Flink session cluster). The > session cluster is individual for each e2e test run. The problems described > below happen scarcely (1 in ~ 50 run maybe). > *Problem* > Rarely the operator seems to "mix up" the deployments. This can be seen in > the Flink cluster logs as multiple {{Received JobGraph submission ' NAME>' ()}} logs are created from jobs with the same job_id. This > results in errors such as: > {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}} > It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName > does not match the expected job name of the job being deployed (The job name > is passed to the application via argument). > So far we were unable to reliably reproduce the error. > *Details* > The following lines show the status of 3 jobs form the view point of the > Flink cluster dashboard, and the FlinkSessionJob ressource: > > *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: Restarting > * ID: a7d36f3881f943a2 > * Exceptions: Cannot load user class: > aelps.pipelines.aletsch.smc.SMCUrlMapper > FlinkSessionJob Ressource: > * State: RUNNING > * jobId: a1221c743367497b0002 > * uid: a1221c74-3367-497b-ad2f-8793ab23919d > > *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: - > * ID: - > FlinkSessionJob Ressource: > * State: UPGRADING > * jobId: - > * uid: a7d36f38-81f9-43a0-898f-19b950430e9d > Flink K8s Operator: > * Exceptions: DuplicateJobSubmissionException: Job has already been > submitted. > > *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: Running > * ID: e692c2dfaa18441c0002 > * Exceptions: - > FlinkSessionJob Ressource: > * State: RUNNING > * jobId: e692c2dfaa18441c0002 > * uid: e692c2df-aa18-441c-a352-88aefa9a3017 > As we can see the *aletsch_smc* job is presumably running according to the > FlinkSessionJob resource, but crash-looping in the cluster and it has the > jobID matching the uid of the resource of {*}aletsch_mat{*}. While > *aletsch_mat* is not even running. The following logs also show some > suspicious entries: There are several {{Received JobGraph submission}} from > different jobs with the same jobID. > > *Logs* > The logs are filtered by the 3 jobIds from above. > > JobID: a7d36f3881f943a2 > {code:bash} > Flink Cluster > ... > 023-07-06 10:23:50,552 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 > (a7d36f3881f943a2) switched from state RUNNING to RESTARTING. > 2023-07-06 10:23:50
[jira] [Commented] (FLINK-32552) Mixed up Flink session job deployments
[ https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743096#comment-17743096 ] Fabio Wanner commented on FLINK-32552: -- It's definitely not a bug in the operator: I deployed 4 different jobs at the same time and observed the following: 4 calls are made of the method JarRunHandler.handleRequest() and when looking at the request 4 distinct ids for the 4 jobs are present, but the EmbeddedExecutor's submitAndGetJobClientFuture() method (also called 4 times) will have 3 distinct optJobIds and one duplicate, leading to different flavors of the problem described in this but ticket (depending on the exact timing of the parallel job launches). I will open a new bug ticket with the relevant information and link it here. > Mixed up Flink session job deployments > -- > > Key: FLINK-32552 > URL: https://issues.apache.org/jira/browse/FLINK-32552 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Fabio Wanner >Priority: Major > > *Context* > In the scope of end-to-end tests we deploy all the Flink session jobs we have > regularly in a staging environment. Some of the jobs are bundled together in > one helm chart and therefore deployed at the same time. There are around 40 > individual Flink jobs (running on the same Flink session cluster). The > session cluster is individual for each e2e test run. The problems described > below happen scarcely (1 in ~ 50 run maybe). > *Problem* > Rarely the operator seems to "mix up" the deployments. This can be seen in > the Flink cluster logs as multiple {{Received JobGraph submission ' NAME>' ()}} logs are created from jobs with the same job_id. This > results in errors such as: > {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}} > It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName > does not match the expected job name of the job being deployed (The job name > is passed to the application via argument). > So far we were unable to reliably reproduce the error. > *Details* > The following lines show the status of 3 jobs form the view point of the > Flink cluster dashboard, and the FlinkSessionJob ressource: > > *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: Restarting > * ID: a7d36f3881f943a2 > * Exceptions: Cannot load user class: > aelps.pipelines.aletsch.smc.SMCUrlMapper > FlinkSessionJob Ressource: > * State: RUNNING > * jobId: a1221c743367497b0002 > * uid: a1221c74-3367-497b-ad2f-8793ab23919d > > *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: - > * ID: - > FlinkSessionJob Ressource: > * State: UPGRADING > * jobId: - > * uid: a7d36f38-81f9-43a0-898f-19b950430e9d > Flink K8s Operator: > * Exceptions: DuplicateJobSubmissionException: Job has already been > submitted. > > *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: Running > * ID: e692c2dfaa18441c0002 > * Exceptions: - > FlinkSessionJob Ressource: > * State: RUNNING > * jobId: e692c2dfaa18441c0002 > * uid: e692c2df-aa18-441c-a352-88aefa9a3017 > As we can see the *aletsch_smc* job is presumably running according to the > FlinkSessionJob resource, but crash-looping in the cluster and it has the > jobID matching the uid of the resource of {*}aletsch_mat{*}. While > *aletsch_mat* is not even running. The following logs also show some > suspicious entries: There are several {{Received JobGraph submission}} from > different jobs with the same jobID. > > *Logs* > The logs are filtered by the 3 jobIds from above. > > JobID: a7d36f3881f943a2 > {code:bash} > Flink Cluster > ... > 023-07-06 10:23:50,552 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 > (a7d36f3881f943a2) switched from state RUNNING to RESTARTING. > 2023-07-06 10:23:50 file: > '/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2' > (valid JAR) > 2023-07-06 10:23:50,522 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Received resource requirements from job > a7d36f3881f943a2: > [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, > numberOfRequiredSlots=4}] > 2023-07-06 10:23:50,522 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Received resource requirements from job > a7d36f3881f943a2: >
[jira] [Commented] (FLINK-32552) Mixed up Flink session job deployments
[ https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742453#comment-17742453 ] Fabio Wanner commented on FLINK-32552: -- I dug a bit deeper and found the following: Anytime when the error happens there is one job deployment with this exception: {code:java} org.apache.flink.runtime.rest.handler.RestHandlerException: No jobs included in application.{code} This job will be deployed again by the operator and will run without any problems. But one of the other jobs deployed in parallel will get mixed up with the failed one and will not get running (using jar file of the failed one leading to classNotFound or same jobId and/or same name)... > Mixed up Flink session job deployments > -- > > Key: FLINK-32552 > URL: https://issues.apache.org/jira/browse/FLINK-32552 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Fabio Wanner >Priority: Major > > *Context* > In the scope of end-to-end tests we deploy all the Flink session jobs we have > regularly in a staging environment. Some of the jobs are bundled together in > one helm chart and therefore deployed at the same time. There are around 40 > individual Flink jobs (running on the same Flink session cluster). The > session cluster is individual for each e2e test run. The problems described > below happen scarcely (1 in ~ 50 run maybe). > *Problem* > Rarely the operator seems to "mix up" the deployments. This can be seen in > the Flink cluster logs as multiple {{Received JobGraph submission ' NAME>' ()}} logs are created from jobs with the same job_id. This > results in errors such as: > {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}} > It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName > does not match the expected job name of the job being deployed (The job name > is passed to the application via argument). > So far we were unable to reliably reproduce the error. > *Details* > The following lines show the status of 3 jobs form the view point of the > Flink cluster dashboard, and the FlinkSessionJob ressource: > > *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: Restarting > * ID: a7d36f3881f943a2 > * Exceptions: Cannot load user class: > aelps.pipelines.aletsch.smc.SMCUrlMapper > FlinkSessionJob Ressource: > * State: RUNNING > * jobId: a1221c743367497b0002 > * uid: a1221c74-3367-497b-ad2f-8793ab23919d > > *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: - > * ID: - > FlinkSessionJob Ressource: > * State: UPGRADING > * jobId: - > * uid: a7d36f38-81f9-43a0-898f-19b950430e9d > Flink K8s Operator: > * Exceptions: DuplicateJobSubmissionException: Job has already been > submitted. > > *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: Running > * ID: e692c2dfaa18441c0002 > * Exceptions: - > FlinkSessionJob Ressource: > * State: RUNNING > * jobId: e692c2dfaa18441c0002 > * uid: e692c2df-aa18-441c-a352-88aefa9a3017 > As we can see the *aletsch_smc* job is presumably running according to the > FlinkSessionJob resource, but crash-looping in the cluster and it has the > jobID matching the uid of the resource of {*}aletsch_mat{*}. While > *aletsch_mat* is not even running. The following logs also show some > suspicious entries: There are several {{Received JobGraph submission}} from > different jobs with the same jobID. > > *Logs* > The logs are filtered by the 3 jobIds from above. > > JobID: a7d36f3881f943a2 > {code:bash} > Flink Cluster > ... > 023-07-06 10:23:50,552 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 > (a7d36f3881f943a2) switched from state RUNNING to RESTARTING. > 2023-07-06 10:23:50 file: > '/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2' > (valid JAR) > 2023-07-06 10:23:50,522 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Received resource requirements from job > a7d36f3881f943a2: > [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, > numberOfRequiredSlots=4}] > 2023-07-06 10:23:50,522 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Received resource requirements from job > a7d36f3881f943a2: > [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, > numberOfRequiredSlots=3}] > 2023-07-06 10:23:50,522 INFO
[jira] [Updated] (FLINK-32552) Mixed up Flink session job deployments
[ https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabio Wanner updated FLINK-32552: - Description: *Context* In the scope of end-to-end tests we deploy all the Flink session jobs we have regularly in a staging environment. Some of the jobs are bundled together in one helm chart and therefore deployed at the same time. There are around 40 individual Flink jobs (running on the same Flink session cluster). The session cluster is individual for each e2e test run. The problems described below happen scarcely (1 in ~ 50 run maybe). *Problem* Rarely the operator seems to "mix up" the deployments. This can be seen in the Flink cluster logs as multiple {{Received JobGraph submission '' ()}} logs are created from jobs with the same job_id. This results in errors such as: {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}} It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName does not match the expected job name of the job being deployed (The job name is passed to the application via argument). So far we were unable to reliably reproduce the error. *Details* The following lines show the status of 3 jobs form the view point of the Flink cluster dashboard, and the FlinkSessionJob ressource: *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615* Apache Flink Dashboard: * State: Restarting * ID: a7d36f3881f943a2 * Exceptions: Cannot load user class: aelps.pipelines.aletsch.smc.SMCUrlMapper FlinkSessionJob Ressource: * State: RUNNING * jobId: a1221c743367497b0002 * uid: a1221c74-3367-497b-ad2f-8793ab23919d *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615* Apache Flink Dashboard: * State: - * ID: - FlinkSessionJob Ressource: * State: UPGRADING * jobId: - * uid: a7d36f38-81f9-43a0-898f-19b950430e9d Flink K8s Operator: * Exceptions: DuplicateJobSubmissionException: Job has already been submitted. *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615* Apache Flink Dashboard: * State: Running * ID: e692c2dfaa18441c0002 * Exceptions: - FlinkSessionJob Ressource: * State: RUNNING * jobId: e692c2dfaa18441c0002 * uid: e692c2df-aa18-441c-a352-88aefa9a3017 As we can see the *aletsch_smc* job is presumably running according to the FlinkSessionJob resource, but crash-looping in the cluster and it has the jobID matching the uid of the resource of {*}aletsch_mat{*}. While *aletsch_mat* is not even running. The following logs also show some suspicious entries: There are several {{Received JobGraph submission}} from different jobs with the same jobID. *Logs* The logs are filtered by the 3 jobIds from above. JobID: a7d36f3881f943a2 {code:bash} Flink Cluster ... 023-07-06 10:23:50,552 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 (a7d36f3881f943a2) switched from state RUNNING to RESTARTING. 2023-07-06 10:23:50 file: '/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2' (valid JAR) 2023-07-06 10:23:50,522 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job a7d36f3881f943a2: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=4}] 2023-07-06 10:23:50,522 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job a7d36f3881f943a2: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=3}] 2023-07-06 10:23:50,522 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job a7d36f3881f943a2: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=2}] 2023-07-06 10:23:50,522 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job a7d36f3881f943a2: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}] 2023-07-06 10:23:50,512 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 (a7d36f3881f943a2) switched from state RESTARTING to RUNNING. 2023-07-06 10:23:48,979 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job a7d36f3881f943a2 2023-07-06 10:23:48,853 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
[jira] [Commented] (FLINK-32552) Mixed up Flink session job deployments
[ https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740873#comment-17740873 ] Fabio Wanner commented on FLINK-32552: -- I would very much appreciate some pointers on where this issue could origin from. So far I was unable to find the root cause (or reproduce it reliably). Ahh and we are running the operator on version 1.5 with this fix included: https://issues.apache.org/jira/browse/FLINK-32412 > Mixed up Flink session job deployments > -- > > Key: FLINK-32552 > URL: https://issues.apache.org/jira/browse/FLINK-32552 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Fabio Wanner >Priority: Major > > *Context* > In the scope of end-to-end tests we deploy all the Flink session jobs we have > regularly in a staging environment. Some of the jobs are bundled together in > one helm chart and therefore deployed at the same time. There are around 40 > individual Flink jobs (running on the same Flink session cluster). The > session cluster is individual for each e2e test run. The problems described > below happen scarcely (1 in ~ 50 run maybe). > *Problem* > Rarely the operator seems to "mix up" the deployments. This can be seen in > the Flink cluster logs as multiple {{Received JobGraph submission ' NAME>' ()}} logs are created from jobs with the same job_id. This > results in errors such as: > {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}} > It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName > does not match the expected job name of the job being deployed (The job name > is passed to the application via argument). > So far we were unable to reliably reproduce the error. > *Details* > The following lines show the status of 3 jobs form the view point of the > Flink cluster dashboard, and the FlinkSessionJob ressource: > > *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: Restarting > * ID: a7d36f3881f943a2 > * Exceptions: Cannot load user class: > aelps.pipelines.aletsch.smc.SMCUrlMapper > FlinkSessionJob Ressource: > * State: RUNNING > * jobId: a1221c743367497b0002 > * uid: a1221c74-3367-497b-ad2f-8793ab23919d > > *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: - > * ID: - > FlinkSessionJob Ressource: > * State: UPGRADING > * jobId: - > * uid: a7d36f38-81f9-43a0-898f-19b950430e9d > Flink K8s Operator: > * Exceptions: DuplicateJobSubmissionException: Job has already been > submitted. > > *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615* > Apache Flink Dashboard: > * State: Running > * ID: e692c2dfaa18441c0002 > * Exceptions: - > FlinkSessionJob Ressource: > * State: RUNNING > * jobId: e692c2dfaa18441c0002 > * uid: e692c2df-aa18-441c-a352-88aefa9a3017 > As we can see the *aletsch_smc* job is presumably running according to the > FlinkSessionJob resource, but crash-looping in the cluster and it has the > jobID matching the uid of the resource of {*}aletsch_mat{*}. While > *aletsch_mat* is not even running. The following logs also show some > suspicious entries: There are several {{Received JobGraph submission}} from > different jobs with the same jobID. > > *Logs* > The logs are filtered by the 3 jobIds from above. > > JobID: a7d36f3881f943a2 > {code:bash} > Flink Cluster > ... > 023-07-06 10:23:50,552 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 > (a7d36f3881f943a2) switched from state RUNNING to RESTARTING. > 2023-07-06 10:23:50 file: > '/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2' > (valid JAR) > 2023-07-06 10:23:50,522 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Received resource requirements from job > a7d36f3881f943a2: > [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, > numberOfRequiredSlots=4}] > 2023-07-06 10:23:50,522 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Received resource requirements from job > a7d36f3881f943a2: > [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, > numberOfRequiredSlots=3}] > 2023-07-06 10:23:50,522 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Received resource requirements from job > a7d36f3881f943a2: > [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, >
[jira] [Updated] (FLINK-32552) Mixed up Flink session job deployments
[ https://issues.apache.org/jira/browse/FLINK-32552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabio Wanner updated FLINK-32552: - Description: *Context* In the scope of end-to-end tests we deploy all the Flink session jobs we have regularly in a staging environment. Some of the jobs are bundled together in one helm chart and therefore deployed at the same time. There are around 40 individual Flink jobs (running on the same Flink session cluster). The session cluster is individual for each e2e test run. The problems described below happen scarcely (1 in ~ 50 run maybe). *Problem* Rarely the operator seems to "mix up" the deployments. This can be seen in the Flink cluster logs as multiple {{Received JobGraph submission '' ()}} logs are created from jobs with the same job_id. This results in errors such as: {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}} It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName does not match the expected job name of the job being deployed (The job name is passed to the application via argument). So far we were unable to reliably reproduce the error. *Details* The following lines show the status of 3 jobs form the view point of the Flink cluster dashboard, and the FlinkSessionJob ressource: *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615* Apache Flink Dashboard: * State: Restarting * ID: a7d36f3881f943a2 * Exceptions: Cannot load user class: aelps.pipelines.aletsch.smc.SMCUrlMapper FlinkSessionJob Ressource: * State: RUNNING * jobId: a1221c743367497b0002 * uid: a1221c74-3367-497b-ad2f-8793ab23919d *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615* Apache Flink Dashboard: * State: - * ID: - FlinkSessionJob Ressource: * State: UPGRADING * jobId: - * uid: a7d36f38-81f9-43a0-898f-19b950430e9d Flink K8s Operator: * Exceptions: DuplicateJobSubmissionException: Job has already been submitted. *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615* Apache Flink Dashboard: * State: Running * ID: e692c2dfaa18441c0002 * Exceptions: - FlinkSessionJob Ressource: * State: RUNNING * jobId: e692c2dfaa18441c0002 * uid: e692c2df-aa18-441c-a352-88aefa9a3017 As we can see the *aletsch_smc* job is presumably running according to the FlinkSessionJob resource, but crash-looping in the cluster and it has the jobID matching the uid of the resource of {*}aletsch_mat{*}. While *aletsch_mat* is not even running. The following logs also show some suspicious entries: There are several {{Received JobGraph submission}} from different jobs with the same jobID. *Logs* The logs are filtered by the 3 jobIds from above. JobID: a7d36f3881f943a2 {code:bash} Flink Cluster ... 023-07-06 10:23:50,552 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 (a7d36f3881f943a2) switched from state RUNNING to RESTARTING. 2023-07-06 10:23:50 file: '/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2' (valid JAR) 2023-07-06 10:23:50,522 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job a7d36f3881f943a2: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=4}] 2023-07-06 10:23:50,522 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job a7d36f3881f943a2: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=3}] 2023-07-06 10:23:50,522 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job a7d36f3881f943a2: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=2}] 2023-07-06 10:23:50,522 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job a7d36f3881f943a2: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}] 2023-07-06 10:23:50,512 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 (a7d36f3881f943a2) switched from state RESTARTING to RUNNING. 2023-07-06 10:23:48,979 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job a7d36f3881f943a2 2023-07-06 10:23:48,853 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
[jira] [Created] (FLINK-32552) Mixed up Flink session job deployments
Fabio Wanner created FLINK-32552: Summary: Mixed up Flink session job deployments Key: FLINK-32552 URL: https://issues.apache.org/jira/browse/FLINK-32552 Project: Flink Issue Type: Bug Components: Kubernetes Operator Reporter: Fabio Wanner *Context* In the scope of end-to-end tests we deploy all the Flink session jobs we have regularly in a staging environment. Some of the jobs are bundled together in one helm chart and therefore deployed at the same time. There are around 40 individual Flink jobs (running on the same Flink session cluster). The session cluster is individual for each e2e test run. The problems described below happen scarcely (1 in ~ 50 run maybe). *Problem* Rarely the operator seems to "mix up" the deployments. This can be seen in the Flink cluster logs as multiple {{Received JobGraph submission '' ()}} logs are created from jobs with the same job_id. This results in errors such as: {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}} It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName does not match the expected job name of the job being deployed (The job name is passed to the application via argument). So far we were unable to reliably reproduce the error. *Details* The following lines show the status of 3 jobs form the view point of the Flink cluster dashboard, and the FlinkSessionJob ressource: *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615* Apache Flink Dashboard: * State: Restarting * ID: a7d36f3881f943a2 * Exceptions: Cannot load user class: aelps.pipelines.aletsch.smc.SMCUrlMapper FlinkSessionJob Ressource: * State: RUNNING * jobId: a1221c743367497b0002 * uid: a1221c74-3367-497b-ad2f-8793ab23919d *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615* Apache Flink Dashboard: * State: - * ID: - FlinkSessionJob Ressource: * State: UPGRADING * jobId: - * uid: a7d36f38-81f9-43a0-898f-19b950430e9d Flink K8s Operator: * Exceptions: DuplicateJobSubmissionException: Job has already been submitted. *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615* Apache Flink Dashboard: * State: Running * ID: e692c2dfaa18441c0002 * Exceptions: - FlinkSessionJob Ressource: * State: RUNNING * jobId: e692c2dfaa18441c0002 * uid: e692c2df-aa18-441c-a352-88aefa9a3017 As we can see the *aletsch_smc* job is presumably running according to the FlinkSessionJob resource, but crash-looping in the cluster and it has the jobID matching the uid of the resource of {*}aletsch_mat{*}. While *aletsch_mat* is not even running. The following logs also show some suspicious entries: There are several {{Received JobGraph submission}} from different jobs with the same jobID. *Logs* The logs are filtered by the ** 3 jobIds from above. JobID: a7d36f3881f943a2 {code:bash} Flink Cluster ... 023-07-06 10:23:50,552 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 (a7d36f3881f943a2) switched from state RUNNING to RESTARTING. 2023-07-06 10:23:50 file: '/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2' (valid JAR) 2023-07-06 10:23:50,522 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job a7d36f3881f943a2: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=4}] 2023-07-06 10:23:50,522 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job a7d36f3881f943a2: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=3}] 2023-07-06 10:23:50,522 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job a7d36f3881f943a2: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=2}] 2023-07-06 10:23:50,522 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job a7d36f3881f943a2: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}] 2023-07-06 10:23:50,512 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 (a7d36f3881f943a2) switched from state RESTARTING to RUNNING. 2023-07-06 10:23:48,979 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job
[jira] [Commented] (FLINK-32412) JobID collisions in FlinkSessionJob
[ https://issues.apache.org/jira/browse/FLINK-32412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17736059#comment-17736059 ] Fabio Wanner commented on FLINK-32412: -- Ohh that's a very good point, I did not think about! I will try this out to be extra sure and then open a PR with the suggested solution! Thanks for the fast reply! > JobID collisions in FlinkSessionJob > --- > > Key: FLINK-32412 > URL: https://issues.apache.org/jira/browse/FLINK-32412 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: Fabio Wanner >Priority: Major > > From time to time we see {{JobId}} collisions in our deployments due to the > low entropy of the generated {{{}JobId{}}}. The problem is that, although the > {{uid}} from the k8s-resource (which is a UUID V4), only the {{hashCode}} of > it will be used for the {{{}JobId{}}}. The {{hashCode}} is an integer, thus > 32 bits. If we look at the birthday problem theorem we can expect a collision > with a 50% chance with only 77000 random integers. > In reality we seem to see the problem more often, but this could be because > the {{uid}} might not be completely random, therefore increasing the chances > if we just use parts of it. > We propose to at least use the complete 64 bits of the upper part of the > {{{}JobId{}}}, where 5.1×10{^}9{^} IDs are needed for a collision chance of > 50%. We could even argue that most probably 64 bit for the generation number > is not needed and another 32 bit could be spent on the uid to increase the > entropy of the {{JobId}} even further (This would mean the max generation > would be 4,294,967,295). > Our suggestion for using 64 bits would be: > {code:java} > new JobID( > > UUID.fromString(Preconditions.checkNotNull(uid)).getMostSignificantBits(), > Preconditions.checkNotNull(generation) > ); > {code} > Any thoughts on this? I would create a PR once we know how to proceed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32412) JobID collisions in FlinkSessionJob
[ https://issues.apache.org/jira/browse/FLINK-32412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabio Wanner updated FLINK-32412: - Description: >From time to time we see {{JobId}} collisions in our deployments due to the >low entropy of the generated {{{}JobId{}}}. The problem is that, although the >{{uid}} from the k8s-resource (which is a UUID V4), only the {{hashCode}} of >it will be used for the {{{}JobId{}}}. The {{hashCode}} is an integer, thus 32 >bits. If we look at the birthday problem theorem we can expect a collision >with a 50% chance with only 77000 random integers. In reality we seem to see the problem more often, but this could be because the {{uid}} might not be completely random, therefore increasing the chances if we just use parts of it. We propose to at least use the complete 64 bits of the upper part of the {{{}JobId{}}}, where 5.1×10{^}9{^} IDs are needed for a collision chance of 50%. We could even argue that most probably 64 bit for the generation number is not needed and another 32 bit could be spent on the uid to increase the entropy of the {{JobId}} even further (This would mean the max generation would be 4,294,967,295). Our suggestion for using 64 bits would be: {code:java} new JobID( UUID.fromString(Preconditions.checkNotNull(uid)).getMostSignificantBits(), Preconditions.checkNotNull(generation) ); {code} Any thoughts on this? I would create a PR once we know how to proceed. was: >From time to time we see {{JobId}} collisions in our deployments due to the >low entropy of the generated {{{}JobId{}}}. The problem is that, although the >{{uid}} from the k8s-resource (which is a UUID, but we don't know of which >version), only the {{hashCode}} of it will be used for the {{{}JobId{}}}. The >{{hashCode}} is an integer, thus 32 bits. If we look at the birthday problem >theorem we can expect a collision with a 50% chance with only 77000 random >integers. In reality we seem to see the problem more often, but this could be because the {{uid}} might not be completely random, therefore increasing the chances if we just use parts of it. We propose to at least use the complete 64 bits of the upper part of the {{{}JobId{}}}, where 5.1×10{^}9{^} IDs are needed for a collision chance of 50%. We could even argue that most probably 64 bit for the generation number is not needed and another 32 bit could be spent on the uid to increase the entropy of the {{JobId}} even further (This would mean the max generation would be 4,294,967,295). Our suggestion for using 64 bits would be: {code:java} new JobID( UUID.fromString(Preconditions.checkNotNull(uid)).getMostSignificantBits(), Preconditions.checkNotNull(generation) ); {code} Any thoughts on this? I would create a PR once we know how to proceed. > JobID collisions in FlinkSessionJob > --- > > Key: FLINK-32412 > URL: https://issues.apache.org/jira/browse/FLINK-32412 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: Fabio Wanner >Priority: Major > > From time to time we see {{JobId}} collisions in our deployments due to the > low entropy of the generated {{{}JobId{}}}. The problem is that, although the > {{uid}} from the k8s-resource (which is a UUID V4), only the {{hashCode}} of > it will be used for the {{{}JobId{}}}. The {{hashCode}} is an integer, thus > 32 bits. If we look at the birthday problem theorem we can expect a collision > with a 50% chance with only 77000 random integers. > In reality we seem to see the problem more often, but this could be because > the {{uid}} might not be completely random, therefore increasing the chances > if we just use parts of it. > We propose to at least use the complete 64 bits of the upper part of the > {{{}JobId{}}}, where 5.1×10{^}9{^} IDs are needed for a collision chance of > 50%. We could even argue that most probably 64 bit for the generation number > is not needed and another 32 bit could be spent on the uid to increase the > entropy of the {{JobId}} even further (This would mean the max generation > would be 4,294,967,295). > Our suggestion for using 64 bits would be: > {code:java} > new JobID( > > UUID.fromString(Preconditions.checkNotNull(uid)).getMostSignificantBits(), > Preconditions.checkNotNull(generation) > ); > {code} > Any thoughts on this? I would create a PR once we know how to proceed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32412) JobID collisions in FlinkSessionJob
Fabio Wanner created FLINK-32412: Summary: JobID collisions in FlinkSessionJob Key: FLINK-32412 URL: https://issues.apache.org/jira/browse/FLINK-32412 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.5.0 Reporter: Fabio Wanner >From time to time we see {{JobId}} collisions in our deployments due to the >low entropy of the generated {{{}JobId{}}}. The problem is that, although the >{{uid}} from the k8s-resource (which is a UUID, but we don't know of which >version), only the {{hashCode}} of it will be used for the {{{}JobId{}}}. The >{{hashCode}} is an integer, thus 32 bits. If we look at the birthday problem >theorem we can expect a collision with a 50% chance with only 77000 random >integers. In reality we seem to see the problem more often, but this could be because the {{uid}} might not be completely random, therefore increasing the chances if we just use parts of it. We propose to at least use the complete 64 bits of the upper part of the {{{}JobId{}}}, where 5.1×10{^}9{^} IDs are needed for a collision chance of 50%. We could even argue that most probably 64 bit for the generation number is not needed and another 32 bit could be spent on the uid to increase the entropy of the {{JobId}} even further (This would mean the max generation would be 4,294,967,295). Our suggestion for using 64 bits would be: {code:java} new JobID( UUID.fromString(Preconditions.checkNotNull(uid)).getMostSignificantBits(), Preconditions.checkNotNull(generation) ); {code} Any thoughts on this? I would create a PR once we know how to proceed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30845) Params in jarURI end up in file name
[ https://issues.apache.org/jira/browse/FLINK-30845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabio Wanner updated FLINK-30845: - External issue URL: (was: https://github.com/apache/flink-kubernetes-operator/pull/522) > Params in jarURI end up in file name > > > Key: FLINK-30845 > URL: https://issues.apache.org/jira/browse/FLINK-30845 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.3.0 >Reporter: Fabio Wanner >Priority: Minor > Labels: pull-request-available, starter > > *Context* > Jar files for jobs are submitted to the operator by supplying a URI to the > .jar file. This URI can be a file system path or a URI to some HTTP resource. > If a HTTP URI is given, the file will be fetched using the > {{{}HttpArtifactFetcher{}}}. > There are cases where the supplied URI will contain additional params. For > example if pre-signed S3 URLs are used. > Example: > {code:java} > https://some-domain.example.com/some.jar?some=params{code} > *Problem* > When the HttpArtifactFetcher determines the name of the .jar file it does > also use the params as part of the file name. In the example from above the > resulting file name would be: {{some.jar?some=params}} > Submitting this job to Flink will result in an error as it will be checked > for the file name to end with {{.jar}} > *Possible Solution* > In the {{HttpArtifactFetcher}} it would be enough to replace: > {code:java} > String fileName = FilenameUtils.getName(url.getFile());{code} > with > {code:java} > String fileName = FilenameUtils.getName(url.getPath());{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30845) Params in jarURI end up in file name
[ https://issues.apache.org/jira/browse/FLINK-30845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabio Wanner updated FLINK-30845: - External issue URL: https://github.com/apache/flink-kubernetes-operator/pull/522 Labels: pull-request-available starter (was: starter) > Params in jarURI end up in file name > > > Key: FLINK-30845 > URL: https://issues.apache.org/jira/browse/FLINK-30845 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.3.0 >Reporter: Fabio Wanner >Priority: Minor > Labels: pull-request-available, starter > > *Context* > Jar files for jobs are submitted to the operator by supplying a URI to the > .jar file. This URI can be a file system path or a URI to some HTTP resource. > If a HTTP URI is given, the file will be fetched using the > {{{}HttpArtifactFetcher{}}}. > There are cases where the supplied URI will contain additional params. For > example if pre-signed S3 URLs are used. > Example: > {code:java} > https://some-domain.example.com/some.jar?some=params{code} > *Problem* > When the HttpArtifactFetcher determines the name of the .jar file it does > also use the params as part of the file name. In the example from above the > resulting file name would be: {{some.jar?some=params}} > Submitting this job to Flink will result in an error as it will be checked > for the file name to end with {{.jar}} > *Possible Solution* > In the {{HttpArtifactFetcher}} it would be enough to replace: > {code:java} > String fileName = FilenameUtils.getName(url.getFile());{code} > with > {code:java} > String fileName = FilenameUtils.getName(url.getPath());{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30851) flink-kubernetes-operator - operator image should provide s3 support for jarUri
[ https://issues.apache.org/jira/browse/FLINK-30851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682571#comment-17682571 ] Fabio Wanner commented on FLINK-30851: -- Thanks a lot for opening this ticket! We did not think about using the S3 plugin but went with using presigned S3 URLs instead but ran into this problem: FLINK-30845. But of course if you only have one S3 endpoint the plugin makes much sense! > flink-kubernetes-operator - operator image should provide s3 support for > jarUri > --- > > Key: FLINK-30851 > URL: https://issues.apache.org/jira/browse/FLINK-30851 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.3.1 >Reporter: Vincent Chenal >Priority: Minor > > The docker image does not embed filesystem plugins. > Neither in plugins/ neither in opt/ folders. > I'd like to use jarUri this way: > {code:java} > apiVersion: flink.apache.org/v1beta1 > kind: FlinkSessionJob > metadata: > name: myjob > spec: > deploymentName: mydeployment > job: > jarURI: s3://somebucket/myjar.jar {code} > But I'm getting this error: > {code:java} > Could not find a file system implementation for scheme 's3'. The scheme is > directly supported by Flink through the following plugins: > flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin > resides within its own subfolder within the plugins directory. See > https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for > more information. If you want to use a Hadoop file system for that scheme, > please add the scheme to the configuration fs.allowed-fallback-filesystems. > For a full list of supported file systems, please see > https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/. {code} > I was able to make it work by putting flink 1.16 s3-presto-fs jar within the > flink-kubernetes-operator image but it would make sense to have it natively. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30845) Params in jarURI end up in file name
Fabio Wanner created FLINK-30845: Summary: Params in jarURI end up in file name Key: FLINK-30845 URL: https://issues.apache.org/jira/browse/FLINK-30845 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.3.0 Reporter: Fabio Wanner *Context* Jar files for jobs are submitted to the operator by supplying a URI to the .jar file. This URI can be a file system path or a URI to some HTTP resource. If a HTTP URI is given, the file will be fetched using the {{{}HttpArtifactFetcher{}}}. There are cases where the supplied URI will contain additional params. For example if pre-signed S3 URLs are used. Example: {code:java} https://some-domain.example.com/some.jar?some=params{code} *Problem* When the HttpArtifactFetcher determines the name of the .jar file it does also use the params as part of the file name. In the example from above the resulting file name would be: {{some.jar?some=params}} Submitting this job to Flink will result in an error as it will be checked for the file name to end with {{.jar}} *Possible Solution* In the {{HttpArtifactFetcher}} it would be enough to replace: {code:java} String fileName = FilenameUtils.getName(url.getFile());{code} with {code:java} String fileName = FilenameUtils.getName(url.getPath());{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)