Re: Native kubernetes execution and History server
Thanks Guowei for the comments and Lukáš Drbal for sharing the feedback. I think it is not only for Kubernetes application mode, but also Yarn application and standalone application, the job id will be set to ZERO if not configured explicitly in HA mode. For standalone application, we could use "--job-id" to specify the user defined job id. However, for Yarn and Kubernetes applications, we do not have a public config options for this. Concerning we might support multiple jobs in a single Flink application when HA enabled in the future, introducing such a public config option may be inopportune. Best, Yang Lukáš Drbal 于2021年3月25日周四 下午7:09写道: > Hello Guowei, > > I just checked it and it works! > > Thanks a lot! > > Here is workaround which use UUID as jobId: > -D\$internal.pipeline.job-id=$(cat /proc/sys/kernel/random/uuid|tr -d "-") > > > L. > > On Thu, Mar 25, 2021 at 11:01 AM Guowei Ma wrote: > >> Hi, >> Thanks for providing the logs. From the logs this is a known bug.[1] >> Maybe you could use `$internal.pipeline.job-id` to set your own >> job-id.(Thanks to Wang Yang) >> But keep in mind this is only for internal use and may be changed in >> some release. So you should keep an eye on [1] for the correct solution. >> >> [1] https://issues.apache.org/jira/browse/FLINK-19358 >> >> Best, >> Guowei >> >> >> On Thu, Mar 25, 2021 at 5:31 PM Lukáš Drbal >> wrote: >> >>> Hello, >>> >>> sure. Here is log from first run which succeed - >>> https://pastebin.com/tV75ZS5S >>> and here is from second run (it's same for all next) - >>> https://pastebin.com/pwTFyGvE >>> >>> My Docker file is pretty simple, just take wordcount + S3 >>> >>> FROM flink:1.12.2 >>> >>> RUN mkdir -p $FLINK_HOME/usrlib >>> COPY flink-examples-batch_2.12-1.12.2-WordCount.jar >>> $FLINK_HOME/usrlib/wordcount.jar >>> >>> RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto >>> COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/ >>> >>> Thanks! >>> >>> On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma wrote: >>> Hi, After some discussion with Wang Yang offline, it seems that there might be a jobmanager failover. So would you like to share full jobmanager log? Best, Guowei On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal wrote: > Hi, > > I would like to use native kubernetes execution [1] for one batch job > and let scheduling on kubernetes. Flink version: 1.12.2. > > Kubernetes job: > apiVersion: batch/v1beta1 > kind: CronJob > metadata: > name: scheduled-job > spec: > schedule: "*/1 * * * *" > jobTemplate: > spec: > template: > metadata: > labels: > app: super-flink-batch-job > spec: > containers: > - name: runner > image: localhost:5000/batch-flink-app-v3:latest > imagePullPolicy: Always > command: > - /bin/sh > - -c > - /opt/flink/bin/flink run-application --target > kubernetes-application -Dkubernetes.service-account=flink-service-account > -Dkubernetes.rest-service.exposed.type=NodePort > -Dkubernetes.cluster-id=batch-job-cluster > -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest > -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY > -Ds3.secret-key=SECRETKEY > -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/ > -Ds3.path-style-access=true -Ds3.ssl.enabled=false > -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > -Dhigh-availability.storageDir=s3://flink/flink-ha > local:///opt/flink/usrlib/job.jar > restartPolicy: OnFailure > > > This works well for me but I would like to write the result to the > archive path and show it in the History server (running as separate > deployment in k8) > > Anytime it creates JobId= which > obviously leads to > > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has > already been submitted. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > ~[?:1.8.0_282] > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > ~[?:1.8.0_282] > at > org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at >
Re: Native kubernetes execution and History server
Hello Guowei, I just checked it and it works! Thanks a lot! Here is workaround which use UUID as jobId: -D\$internal.pipeline.job-id=$(cat /proc/sys/kernel/random/uuid|tr -d "-") L. On Thu, Mar 25, 2021 at 11:01 AM Guowei Ma wrote: > Hi, > Thanks for providing the logs. From the logs this is a known bug.[1] > Maybe you could use `$internal.pipeline.job-id` to set your own > job-id.(Thanks to Wang Yang) > But keep in mind this is only for internal use and may be changed in > some release. So you should keep an eye on [1] for the correct solution. > > [1] https://issues.apache.org/jira/browse/FLINK-19358 > > Best, > Guowei > > > On Thu, Mar 25, 2021 at 5:31 PM Lukáš Drbal wrote: > >> Hello, >> >> sure. Here is log from first run which succeed - >> https://pastebin.com/tV75ZS5S >> and here is from second run (it's same for all next) - >> https://pastebin.com/pwTFyGvE >> >> My Docker file is pretty simple, just take wordcount + S3 >> >> FROM flink:1.12.2 >> >> RUN mkdir -p $FLINK_HOME/usrlib >> COPY flink-examples-batch_2.12-1.12.2-WordCount.jar >> $FLINK_HOME/usrlib/wordcount.jar >> >> RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto >> COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/ >> >> Thanks! >> >> On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma wrote: >> >>> Hi, >>> After some discussion with Wang Yang offline, it seems that there might >>> be a jobmanager failover. So would you like to share full jobmanager log? >>> Best, >>> Guowei >>> >>> >>> On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal >>> wrote: >>> Hi, I would like to use native kubernetes execution [1] for one batch job and let scheduling on kubernetes. Flink version: 1.12.2. Kubernetes job: apiVersion: batch/v1beta1 kind: CronJob metadata: name: scheduled-job spec: schedule: "*/1 * * * *" jobTemplate: spec: template: metadata: labels: app: super-flink-batch-job spec: containers: - name: runner image: localhost:5000/batch-flink-app-v3:latest imagePullPolicy: Always command: - /bin/sh - -c - /opt/flink/bin/flink run-application --target kubernetes-application -Dkubernetes.service-account=flink-service-account -Dkubernetes.rest-service.exposed.type=NodePort -Dkubernetes.cluster-id=batch-job-cluster -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY -Ds3.secret-key=SECRETKEY -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/ -Ds3.path-style-access=true -Ds3.ssl.enabled=false -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -Dhigh-availability.storageDir=s3://flink/flink-ha local:///opt/flink/usrlib/job.jar restartPolicy: OnFailure This works well for me but I would like to write the result to the archive path and show it in the History server (running as separate deployment in k8) Anytime it creates JobId= which obviously leads to Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has already been submitted. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_282] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_282] at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.api.java.DataSet.collect(DataSet.java:417) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.api.java.DataSet.print(DataSet.java:1748) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_282] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_282] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_282] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
Re: Native kubernetes execution and History server
Hi, Thanks for providing the logs. From the logs this is a known bug.[1] Maybe you could use `$internal.pipeline.job-id` to set your own job-id.(Thanks to Wang Yang) But keep in mind this is only for internal use and may be changed in some release. So you should keep an eye on [1] for the correct solution. [1] https://issues.apache.org/jira/browse/FLINK-19358 Best, Guowei On Thu, Mar 25, 2021 at 5:31 PM Lukáš Drbal wrote: > Hello, > > sure. Here is log from first run which succeed - > https://pastebin.com/tV75ZS5S > and here is from second run (it's same for all next) - > https://pastebin.com/pwTFyGvE > > My Docker file is pretty simple, just take wordcount + S3 > > FROM flink:1.12.2 > > RUN mkdir -p $FLINK_HOME/usrlib > COPY flink-examples-batch_2.12-1.12.2-WordCount.jar > $FLINK_HOME/usrlib/wordcount.jar > > RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto > COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/ > > Thanks! > > On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma wrote: > >> Hi, >> After some discussion with Wang Yang offline, it seems that there might >> be a jobmanager failover. So would you like to share full jobmanager log? >> Best, >> Guowei >> >> >> On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal >> wrote: >> >>> Hi, >>> >>> I would like to use native kubernetes execution [1] for one batch job >>> and let scheduling on kubernetes. Flink version: 1.12.2. >>> >>> Kubernetes job: >>> apiVersion: batch/v1beta1 >>> kind: CronJob >>> metadata: >>> name: scheduled-job >>> spec: >>> schedule: "*/1 * * * *" >>> jobTemplate: >>> spec: >>> template: >>> metadata: >>> labels: >>> app: super-flink-batch-job >>> spec: >>> containers: >>> - name: runner >>> image: localhost:5000/batch-flink-app-v3:latest >>> imagePullPolicy: Always >>> command: >>> - /bin/sh >>> - -c >>> - /opt/flink/bin/flink run-application --target >>> kubernetes-application -Dkubernetes.service-account=flink-service-account >>> -Dkubernetes.rest-service.exposed.type=NodePort >>> -Dkubernetes.cluster-id=batch-job-cluster >>> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest >>> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY >>> -Ds3.secret-key=SECRETKEY >>> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/ >>> -Ds3.path-style-access=true -Ds3.ssl.enabled=false >>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >>> -Dhigh-availability.storageDir=s3://flink/flink-ha >>> local:///opt/flink/usrlib/job.jar >>> restartPolicy: OnFailure >>> >>> >>> This works well for me but I would like to write the result to the >>> archive path and show it in the History server (running as separate >>> deployment in k8) >>> >>> Anytime it creates JobId= which >>> obviously leads to >>> >>> Caused by: java.util.concurrent.ExecutionException: >>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has >>> already been submitted. >>> at >>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >>> ~[?:1.8.0_282] >>> at >>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) >>> ~[?:1.8.0_282] >>> at >>> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056) >>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>> at >>> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) >>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>> at >>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) >>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>> at >>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942) >>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>> at org.apache.flink.api.java.DataSet.collect(DataSet.java:417) >>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>> at org.apache.flink.api.java.DataSet.print(DataSet.java:1748) >>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>> at >>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96) >>> ~[?:?] >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> ~[?:1.8.0_282] >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> ~[?:1.8.0_282] >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> ~[?:1.8.0_282] >>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282] >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) >>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) >>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>> at >>>
Re: Native kubernetes execution and History server
Hello, sure. Here is log from first run which succeed - https://pastebin.com/tV75ZS5S and here is from second run (it's same for all next) - https://pastebin.com/pwTFyGvE My Docker file is pretty simple, just take wordcount + S3 FROM flink:1.12.2 RUN mkdir -p $FLINK_HOME/usrlib COPY flink-examples-batch_2.12-1.12.2-WordCount.jar $FLINK_HOME/usrlib/wordcount.jar RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/ Thanks! On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma wrote: > Hi, > After some discussion with Wang Yang offline, it seems that there might be > a jobmanager failover. So would you like to share full jobmanager log? > Best, > Guowei > > > On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal > wrote: > >> Hi, >> >> I would like to use native kubernetes execution [1] for one batch job and >> let scheduling on kubernetes. Flink version: 1.12.2. >> >> Kubernetes job: >> apiVersion: batch/v1beta1 >> kind: CronJob >> metadata: >> name: scheduled-job >> spec: >> schedule: "*/1 * * * *" >> jobTemplate: >> spec: >> template: >> metadata: >> labels: >> app: super-flink-batch-job >> spec: >> containers: >> - name: runner >> image: localhost:5000/batch-flink-app-v3:latest >> imagePullPolicy: Always >> command: >> - /bin/sh >> - -c >> - /opt/flink/bin/flink run-application --target >> kubernetes-application -Dkubernetes.service-account=flink-service-account >> -Dkubernetes.rest-service.exposed.type=NodePort >> -Dkubernetes.cluster-id=batch-job-cluster >> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest >> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY >> -Ds3.secret-key=SECRETKEY >> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/ >> -Ds3.path-style-access=true -Ds3.ssl.enabled=false >> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >> -Dhigh-availability.storageDir=s3://flink/flink-ha >> local:///opt/flink/usrlib/job.jar >> restartPolicy: OnFailure >> >> >> This works well for me but I would like to write the result to the >> archive path and show it in the History server (running as separate >> deployment in k8) >> >> Anytime it creates JobId= which obviously >> leads to >> >> Caused by: java.util.concurrent.ExecutionException: >> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has >> already been submitted. >> at >> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >> ~[?:1.8.0_282] >> at >> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) >> ~[?:1.8.0_282] >> at >> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at org.apache.flink.api.java.DataSet.collect(DataSet.java:417) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at org.apache.flink.api.java.DataSet.print(DataSet.java:1748) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96) >> ~[?:?] >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> ~[?:1.8.0_282] >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> ~[?:1.8.0_282] >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> ~[?:1.8.0_282] >> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282] >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> at >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >> ... 10 more >> >> I assume it is because it will spawn a completely new cluster for each >> run. >> >> Can I somehow set jobId or I'm trying to do something unsupported/bad? >> >> Thanks for advice. >> >> L. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html >>
Re: Native kubernetes execution and History server
Hi, After some discussion with Wang Yang offline, it seems that there might be a jobmanager failover. So would you like to share full jobmanager log? Best, Guowei On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal wrote: > Hi, > > I would like to use native kubernetes execution [1] for one batch job and > let scheduling on kubernetes. Flink version: 1.12.2. > > Kubernetes job: > apiVersion: batch/v1beta1 > kind: CronJob > metadata: > name: scheduled-job > spec: > schedule: "*/1 * * * *" > jobTemplate: > spec: > template: > metadata: > labels: > app: super-flink-batch-job > spec: > containers: > - name: runner > image: localhost:5000/batch-flink-app-v3:latest > imagePullPolicy: Always > command: > - /bin/sh > - -c > - /opt/flink/bin/flink run-application --target > kubernetes-application -Dkubernetes.service-account=flink-service-account > -Dkubernetes.rest-service.exposed.type=NodePort > -Dkubernetes.cluster-id=batch-job-cluster > -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest > -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY > -Ds3.secret-key=SECRETKEY > -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/ > -Ds3.path-style-access=true -Ds3.ssl.enabled=false > -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > -Dhigh-availability.storageDir=s3://flink/flink-ha > local:///opt/flink/usrlib/job.jar > restartPolicy: OnFailure > > > This works well for me but I would like to write the result to the archive > path and show it in the History server (running as separate deployment in > k8) > > Anytime it creates JobId= which obviously > leads to > > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has > already been submitted. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > ~[?:1.8.0_282] > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > ~[?:1.8.0_282] > at > org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at org.apache.flink.api.java.DataSet.collect(DataSet.java:417) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at org.apache.flink.api.java.DataSet.print(DataSet.java:1748) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96) > ~[?:?] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_282] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_282] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_282] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > ... 10 more > > I assume it is because it will spawn a completely new cluster for each run. > > Can I somehow set jobId or I'm trying to do something unsupported/bad? > > Thanks for advice. > > L. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html >
Native kubernetes execution and History server
Hi, I would like to use native kubernetes execution [1] for one batch job and let scheduling on kubernetes. Flink version: 1.12.2. Kubernetes job: apiVersion: batch/v1beta1 kind: CronJob metadata: name: scheduled-job spec: schedule: "*/1 * * * *" jobTemplate: spec: template: metadata: labels: app: super-flink-batch-job spec: containers: - name: runner image: localhost:5000/batch-flink-app-v3:latest imagePullPolicy: Always command: - /bin/sh - -c - /opt/flink/bin/flink run-application --target kubernetes-application -Dkubernetes.service-account=flink-service-account -Dkubernetes.rest-service.exposed.type=NodePort -Dkubernetes.cluster-id=batch-job-cluster -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY -Ds3.secret-key=SECRETKEY -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/ -Ds3.path-style-access=true -Ds3.ssl.enabled=false -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -Dhigh-availability.storageDir=s3://flink/flink-ha local:///opt/flink/usrlib/job.jar restartPolicy: OnFailure This works well for me but I would like to write the result to the archive path and show it in the History server (running as separate deployment in k8) Anytime it creates JobId= which obviously leads to Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has already been submitted. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_282] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_282] at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.api.java.DataSet.collect(DataSet.java:417) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.api.java.DataSet.print(DataSet.java:1748) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_282] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_282] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_282] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 10 more I assume it is because it will spawn a completely new cluster for each run. Can I somehow set jobId or I'm trying to do something unsupported/bad? Thanks for advice. L. [1] https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html