Hello Guowei,

I just checked it and it works!

Thanks a lot!

Here is workaround which use UUID as jobId:
-D\$internal.pipeline.job-id=$(cat /proc/sys/kernel/random/uuid|tr -d "-")


L.

On Thu, Mar 25, 2021 at 11:01 AM Guowei Ma <guowei....@gmail.com> wrote:

> Hi,
> Thanks for providing the logs. From the logs this is a known bug.[1]
> Maybe you could use `$internal.pipeline.job-id` to set your own
> job-id.(Thanks to Wang Yang)
> But keep in mind this is only for internal use and may be changed in
> some release. So you should keep an eye on [1] for the correct solution.
>
> [1] https://issues.apache.org/jira/browse/FLINK-19358
>
> Best,
> Guowei
>
>
> On Thu, Mar 25, 2021 at 5:31 PM Lukáš Drbal <lukas.dr...@gmail.com> wrote:
>
>> Hello,
>>
>> sure. Here is log from first run which succeed -
>> https://pastebin.com/tV75ZS5S
>> and here is from second run (it's same for all next) -
>> https://pastebin.com/pwTFyGvE
>>
>> My Docker file is pretty simple, just take wordcount + S3
>>
>> FROM flink:1.12.2
>>
>> RUN mkdir -p $FLINK_HOME/usrlib
>> COPY flink-examples-batch_2.12-1.12.2-WordCount.jar
>>  $FLINK_HOME/usrlib/wordcount.jar
>>
>> RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto
>> COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/
>>
>> Thanks!
>>
>> On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma <guowei....@gmail.com> wrote:
>>
>>> Hi,
>>> After some discussion with Wang Yang offline, it seems that there might
>>> be a jobmanager failover. So would you like to share full jobmanager log?
>>> Best,
>>> Guowei
>>>
>>>
>>> On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal <lukas.dr...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I would like to use native kubernetes execution [1] for one batch job
>>>> and let scheduling on kubernetes. Flink version: 1.12.2.
>>>>
>>>> Kubernetes job:
>>>> apiVersion: batch/v1beta1
>>>> kind: CronJob
>>>> metadata:
>>>>   name: scheduled-job
>>>> spec:
>>>>   schedule: "*/1 * * * *"
>>>>   jobTemplate:
>>>>     spec:
>>>>       template:
>>>>         metadata:
>>>>           labels:
>>>>             app: super-flink-batch-job
>>>>         spec:
>>>>           containers:
>>>>           - name: runner
>>>>             image: localhost:5000/batch-flink-app-v3:latest
>>>>             imagePullPolicy: Always
>>>>             command:
>>>>               - /bin/sh
>>>>               - -c
>>>>               - /opt/flink/bin/flink run-application --target
>>>> kubernetes-application -Dkubernetes.service-account=flink-service-account
>>>> -Dkubernetes.rest-service.exposed.type=NodePort
>>>> -Dkubernetes.cluster-id=batch-job-cluster
>>>> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
>>>> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
>>>> -Ds3.secret-key=SECRETKEY
>>>> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
>>>> -Ds3.path-style-access=true -Ds3.ssl.enabled=false
>>>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>>> -Dhigh-availability.storageDir=s3://flink/flink-ha
>>>> local:///opt/flink/usrlib/job.jar
>>>>           restartPolicy: OnFailure
>>>>
>>>>
>>>> This works well for me but I would like to write the result to the
>>>> archive path and show it in the History server (running as separate
>>>> deployment in k8)
>>>>
>>>> Anytime it creates JobId=00000000000000000000000000000000 which
>>>> obviously leads to
>>>>
>>>> Caused by: java.util.concurrent.ExecutionException:
>>>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>>>> already been submitted.
>>>> at
>>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>>> ~[?:1.8.0_282]
>>>> at
>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>>>> ~[?:1.8.0_282]
>>>> at
>>>> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
>>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>>> at
>>>> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
>>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>>> at
>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
>>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>>> at
>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
>>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>>> at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
>>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>>> at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
>>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>>> at
>>>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
>>>> ~[?:?]
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> ~[?:1.8.0_282]
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> ~[?:1.8.0_282]
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> ~[?:1.8.0_282]
>>>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
>>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
>>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>>> at
>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>>> at
>>>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
>>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>>> ... 10 more
>>>>
>>>> I assume it is because it will spawn a completely new cluster for each
>>>> run.
>>>>
>>>> Can I somehow set jobId or I'm trying to do something unsupported/bad?
>>>>
>>>> Thanks for advice.
>>>>
>>>> L.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html
>>>>
>>>

Reply via email to