Hello Guowei, I just checked it and it works!
Thanks a lot! Here is workaround which use UUID as jobId: -D\$internal.pipeline.job-id=$(cat /proc/sys/kernel/random/uuid|tr -d "-") L. On Thu, Mar 25, 2021 at 11:01 AM Guowei Ma <guowei....@gmail.com> wrote: > Hi, > Thanks for providing the logs. From the logs this is a known bug.[1] > Maybe you could use `$internal.pipeline.job-id` to set your own > job-id.(Thanks to Wang Yang) > But keep in mind this is only for internal use and may be changed in > some release. So you should keep an eye on [1] for the correct solution. > > [1] https://issues.apache.org/jira/browse/FLINK-19358 > > Best, > Guowei > > > On Thu, Mar 25, 2021 at 5:31 PM Lukáš Drbal <lukas.dr...@gmail.com> wrote: > >> Hello, >> >> sure. Here is log from first run which succeed - >> https://pastebin.com/tV75ZS5S >> and here is from second run (it's same for all next) - >> https://pastebin.com/pwTFyGvE >> >> My Docker file is pretty simple, just take wordcount + S3 >> >> FROM flink:1.12.2 >> >> RUN mkdir -p $FLINK_HOME/usrlib >> COPY flink-examples-batch_2.12-1.12.2-WordCount.jar >> $FLINK_HOME/usrlib/wordcount.jar >> >> RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto >> COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/ >> >> Thanks! >> >> On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma <guowei....@gmail.com> wrote: >> >>> Hi, >>> After some discussion with Wang Yang offline, it seems that there might >>> be a jobmanager failover. So would you like to share full jobmanager log? >>> Best, >>> Guowei >>> >>> >>> On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal <lukas.dr...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I would like to use native kubernetes execution [1] for one batch job >>>> and let scheduling on kubernetes. Flink version: 1.12.2. >>>> >>>> Kubernetes job: >>>> apiVersion: batch/v1beta1 >>>> kind: CronJob >>>> metadata: >>>> name: scheduled-job >>>> spec: >>>> schedule: "*/1 * * * *" >>>> jobTemplate: >>>> spec: >>>> template: >>>> metadata: >>>> labels: >>>> app: super-flink-batch-job >>>> spec: >>>> containers: >>>> - name: runner >>>> image: localhost:5000/batch-flink-app-v3:latest >>>> imagePullPolicy: Always >>>> command: >>>> - /bin/sh >>>> - -c >>>> - /opt/flink/bin/flink run-application --target >>>> kubernetes-application -Dkubernetes.service-account=flink-service-account >>>> -Dkubernetes.rest-service.exposed.type=NodePort >>>> -Dkubernetes.cluster-id=batch-job-cluster >>>> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest >>>> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY >>>> -Ds3.secret-key=SECRETKEY >>>> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/ >>>> -Ds3.path-style-access=true -Ds3.ssl.enabled=false >>>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >>>> -Dhigh-availability.storageDir=s3://flink/flink-ha >>>> local:///opt/flink/usrlib/job.jar >>>> restartPolicy: OnFailure >>>> >>>> >>>> This works well for me but I would like to write the result to the >>>> archive path and show it in the History server (running as separate >>>> deployment in k8) >>>> >>>> Anytime it creates JobId=00000000000000000000000000000000 which >>>> obviously leads to >>>> >>>> Caused by: java.util.concurrent.ExecutionException: >>>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has >>>> already been submitted. >>>> at >>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >>>> ~[?:1.8.0_282] >>>> at >>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) >>>> ~[?:1.8.0_282] >>>> at >>>> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056) >>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>>> at >>>> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) >>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>>> at >>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) >>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>>> at >>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942) >>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>>> at org.apache.flink.api.java.DataSet.collect(DataSet.java:417) >>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>>> at org.apache.flink.api.java.DataSet.print(DataSet.java:1748) >>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>>> at >>>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96) >>>> ~[?:?] >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> ~[?:1.8.0_282] >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> ~[?:1.8.0_282] >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> ~[?:1.8.0_282] >>>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282] >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) >>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>>> at >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) >>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>>> at >>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) >>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>>> at >>>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) >>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2] >>>> ... 10 more >>>> >>>> I assume it is because it will spawn a completely new cluster for each >>>> run. >>>> >>>> Can I somehow set jobId or I'm trying to do something unsupported/bad? >>>> >>>> Thanks for advice. >>>> >>>> L. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html >>>> >>>