Re: Native kubernetes execution and History server

2021-03-25 Thread Yang Wang
Thanks Guowei for the comments and Lukáš Drbal for sharing the feedback.

I think it is not only for Kubernetes application mode, but also Yarn
application and standalone application,
the job id will be set to ZERO if not configured explicitly in HA mode.

For standalone application, we could use "--job-id" to specify the user
defined job id.

However, for Yarn and Kubernetes applications, we do not have a public
config options for this.
Concerning we might support multiple jobs in a single Flink application
when HA enabled in the future,
introducing such a public config option may be inopportune.


Best,
Yang

Lukáš Drbal  于2021年3月25日周四 下午7:09写道:

> Hello Guowei,
>
> I just checked it and it works!
>
> Thanks a lot!
>
> Here is workaround which use UUID as jobId:
> -D\$internal.pipeline.job-id=$(cat /proc/sys/kernel/random/uuid|tr -d "-")
>
>
> L.
>
> On Thu, Mar 25, 2021 at 11:01 AM Guowei Ma  wrote:
>
>> Hi,
>> Thanks for providing the logs. From the logs this is a known bug.[1]
>> Maybe you could use `$internal.pipeline.job-id` to set your own
>> job-id.(Thanks to Wang Yang)
>> But keep in mind this is only for internal use and may be changed in
>> some release. So you should keep an eye on [1] for the correct solution.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-19358
>>
>> Best,
>> Guowei
>>
>>
>> On Thu, Mar 25, 2021 at 5:31 PM Lukáš Drbal 
>> wrote:
>>
>>> Hello,
>>>
>>> sure. Here is log from first run which succeed -
>>> https://pastebin.com/tV75ZS5S
>>> and here is from second run (it's same for all next) -
>>> https://pastebin.com/pwTFyGvE
>>>
>>> My Docker file is pretty simple, just take wordcount + S3
>>>
>>> FROM flink:1.12.2
>>>
>>> RUN mkdir -p $FLINK_HOME/usrlib
>>> COPY flink-examples-batch_2.12-1.12.2-WordCount.jar
>>>  $FLINK_HOME/usrlib/wordcount.jar
>>>
>>> RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto
>>> COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/
>>>
>>> Thanks!
>>>
>>> On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma  wrote:
>>>
 Hi,
 After some discussion with Wang Yang offline, it seems that there might
 be a jobmanager failover. So would you like to share full jobmanager log?
 Best,
 Guowei


 On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal 
 wrote:

> Hi,
>
> I would like to use native kubernetes execution [1] for one batch job
> and let scheduling on kubernetes. Flink version: 1.12.2.
>
> Kubernetes job:
> apiVersion: batch/v1beta1
> kind: CronJob
> metadata:
>   name: scheduled-job
> spec:
>   schedule: "*/1 * * * *"
>   jobTemplate:
> spec:
>   template:
> metadata:
>   labels:
> app: super-flink-batch-job
> spec:
>   containers:
>   - name: runner
> image: localhost:5000/batch-flink-app-v3:latest
> imagePullPolicy: Always
> command:
>   - /bin/sh
>   - -c
>   - /opt/flink/bin/flink run-application --target
> kubernetes-application -Dkubernetes.service-account=flink-service-account
> -Dkubernetes.rest-service.exposed.type=NodePort
> -Dkubernetes.cluster-id=batch-job-cluster
> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
> -Ds3.secret-key=SECRETKEY
> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
> -Ds3.path-style-access=true -Ds3.ssl.enabled=false
> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> -Dhigh-availability.storageDir=s3://flink/flink-ha
> local:///opt/flink/usrlib/job.jar
>   restartPolicy: OnFailure
>
>
> This works well for me but I would like to write the result to the
> archive path and show it in the History server (running as separate
> deployment in k8)
>
> Anytime it creates JobId= which
> obviously leads to
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
> already been submitted.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> ~[?:1.8.0_282]
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> ~[?:1.8.0_282]
> at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> 

Re: Native kubernetes execution and History server

2021-03-25 Thread Lukáš Drbal
Hello Guowei,

I just checked it and it works!

Thanks a lot!

Here is workaround which use UUID as jobId:
-D\$internal.pipeline.job-id=$(cat /proc/sys/kernel/random/uuid|tr -d "-")


L.

On Thu, Mar 25, 2021 at 11:01 AM Guowei Ma  wrote:

> Hi,
> Thanks for providing the logs. From the logs this is a known bug.[1]
> Maybe you could use `$internal.pipeline.job-id` to set your own
> job-id.(Thanks to Wang Yang)
> But keep in mind this is only for internal use and may be changed in
> some release. So you should keep an eye on [1] for the correct solution.
>
> [1] https://issues.apache.org/jira/browse/FLINK-19358
>
> Best,
> Guowei
>
>
> On Thu, Mar 25, 2021 at 5:31 PM Lukáš Drbal  wrote:
>
>> Hello,
>>
>> sure. Here is log from first run which succeed -
>> https://pastebin.com/tV75ZS5S
>> and here is from second run (it's same for all next) -
>> https://pastebin.com/pwTFyGvE
>>
>> My Docker file is pretty simple, just take wordcount + S3
>>
>> FROM flink:1.12.2
>>
>> RUN mkdir -p $FLINK_HOME/usrlib
>> COPY flink-examples-batch_2.12-1.12.2-WordCount.jar
>>  $FLINK_HOME/usrlib/wordcount.jar
>>
>> RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto
>> COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/
>>
>> Thanks!
>>
>> On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma  wrote:
>>
>>> Hi,
>>> After some discussion with Wang Yang offline, it seems that there might
>>> be a jobmanager failover. So would you like to share full jobmanager log?
>>> Best,
>>> Guowei
>>>
>>>
>>> On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal 
>>> wrote:
>>>
 Hi,

 I would like to use native kubernetes execution [1] for one batch job
 and let scheduling on kubernetes. Flink version: 1.12.2.

 Kubernetes job:
 apiVersion: batch/v1beta1
 kind: CronJob
 metadata:
   name: scheduled-job
 spec:
   schedule: "*/1 * * * *"
   jobTemplate:
 spec:
   template:
 metadata:
   labels:
 app: super-flink-batch-job
 spec:
   containers:
   - name: runner
 image: localhost:5000/batch-flink-app-v3:latest
 imagePullPolicy: Always
 command:
   - /bin/sh
   - -c
   - /opt/flink/bin/flink run-application --target
 kubernetes-application -Dkubernetes.service-account=flink-service-account
 -Dkubernetes.rest-service.exposed.type=NodePort
 -Dkubernetes.cluster-id=batch-job-cluster
 -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
 -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
 -Ds3.secret-key=SECRETKEY
 -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
 -Ds3.path-style-access=true -Ds3.ssl.enabled=false
 -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 -Dhigh-availability.storageDir=s3://flink/flink-ha
 local:///opt/flink/usrlib/job.jar
   restartPolicy: OnFailure


 This works well for me but I would like to write the result to the
 archive path and show it in the History server (running as separate
 deployment in k8)

 Anytime it creates JobId= which
 obviously leads to

 Caused by: java.util.concurrent.ExecutionException:
 org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
 already been submitted.
 at
 java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
 ~[?:1.8.0_282]
 at
 java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
 ~[?:1.8.0_282]
 at
 org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
 at
 org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
 at
 org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
 at
 org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
 at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
 at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
 at
 org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
 ~[?:?]
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 ~[?:1.8.0_282]
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 ~[?:1.8.0_282]
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_282]
 at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]

Re: Native kubernetes execution and History server

2021-03-25 Thread Guowei Ma
Hi,
Thanks for providing the logs. From the logs this is a known bug.[1]
Maybe you could use `$internal.pipeline.job-id` to set your own
job-id.(Thanks to Wang Yang)
But keep in mind this is only for internal use and may be changed in
some release. So you should keep an eye on [1] for the correct solution.

[1] https://issues.apache.org/jira/browse/FLINK-19358

Best,
Guowei


On Thu, Mar 25, 2021 at 5:31 PM Lukáš Drbal  wrote:

> Hello,
>
> sure. Here is log from first run which succeed -
> https://pastebin.com/tV75ZS5S
> and here is from second run (it's same for all next) -
> https://pastebin.com/pwTFyGvE
>
> My Docker file is pretty simple, just take wordcount + S3
>
> FROM flink:1.12.2
>
> RUN mkdir -p $FLINK_HOME/usrlib
> COPY flink-examples-batch_2.12-1.12.2-WordCount.jar
>  $FLINK_HOME/usrlib/wordcount.jar
>
> RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto
> COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/
>
> Thanks!
>
> On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma  wrote:
>
>> Hi,
>> After some discussion with Wang Yang offline, it seems that there might
>> be a jobmanager failover. So would you like to share full jobmanager log?
>> Best,
>> Guowei
>>
>>
>> On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal 
>> wrote:
>>
>>> Hi,
>>>
>>> I would like to use native kubernetes execution [1] for one batch job
>>> and let scheduling on kubernetes. Flink version: 1.12.2.
>>>
>>> Kubernetes job:
>>> apiVersion: batch/v1beta1
>>> kind: CronJob
>>> metadata:
>>>   name: scheduled-job
>>> spec:
>>>   schedule: "*/1 * * * *"
>>>   jobTemplate:
>>> spec:
>>>   template:
>>> metadata:
>>>   labels:
>>> app: super-flink-batch-job
>>> spec:
>>>   containers:
>>>   - name: runner
>>> image: localhost:5000/batch-flink-app-v3:latest
>>> imagePullPolicy: Always
>>> command:
>>>   - /bin/sh
>>>   - -c
>>>   - /opt/flink/bin/flink run-application --target
>>> kubernetes-application -Dkubernetes.service-account=flink-service-account
>>> -Dkubernetes.rest-service.exposed.type=NodePort
>>> -Dkubernetes.cluster-id=batch-job-cluster
>>> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
>>> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
>>> -Ds3.secret-key=SECRETKEY
>>> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
>>> -Ds3.path-style-access=true -Ds3.ssl.enabled=false
>>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>> -Dhigh-availability.storageDir=s3://flink/flink-ha
>>> local:///opt/flink/usrlib/job.jar
>>>   restartPolicy: OnFailure
>>>
>>>
>>> This works well for me but I would like to write the result to the
>>> archive path and show it in the History server (running as separate
>>> deployment in k8)
>>>
>>> Anytime it creates JobId= which
>>> obviously leads to
>>>
>>> Caused by: java.util.concurrent.ExecutionException:
>>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>>> already been submitted.
>>> at
>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>> ~[?:1.8.0_282]
>>> at
>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>>> ~[?:1.8.0_282]
>>> at
>>> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
>>> ~[?:?]
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> ~[?:1.8.0_282]
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> ~[?:1.8.0_282]
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> ~[?:1.8.0_282]
>>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at
>>> 

Re: Native kubernetes execution and History server

2021-03-25 Thread Lukáš Drbal
Hello,

sure. Here is log from first run which succeed -
https://pastebin.com/tV75ZS5S
and here is from second run (it's same for all next) -
https://pastebin.com/pwTFyGvE

My Docker file is pretty simple, just take wordcount + S3

FROM flink:1.12.2

RUN mkdir -p $FLINK_HOME/usrlib
COPY flink-examples-batch_2.12-1.12.2-WordCount.jar
 $FLINK_HOME/usrlib/wordcount.jar

RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto
COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/

Thanks!

On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma  wrote:

> Hi,
> After some discussion with Wang Yang offline, it seems that there might be
> a jobmanager failover. So would you like to share full jobmanager log?
> Best,
> Guowei
>
>
> On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal 
> wrote:
>
>> Hi,
>>
>> I would like to use native kubernetes execution [1] for one batch job and
>> let scheduling on kubernetes. Flink version: 1.12.2.
>>
>> Kubernetes job:
>> apiVersion: batch/v1beta1
>> kind: CronJob
>> metadata:
>>   name: scheduled-job
>> spec:
>>   schedule: "*/1 * * * *"
>>   jobTemplate:
>> spec:
>>   template:
>> metadata:
>>   labels:
>> app: super-flink-batch-job
>> spec:
>>   containers:
>>   - name: runner
>> image: localhost:5000/batch-flink-app-v3:latest
>> imagePullPolicy: Always
>> command:
>>   - /bin/sh
>>   - -c
>>   - /opt/flink/bin/flink run-application --target
>> kubernetes-application -Dkubernetes.service-account=flink-service-account
>> -Dkubernetes.rest-service.exposed.type=NodePort
>> -Dkubernetes.cluster-id=batch-job-cluster
>> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
>> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
>> -Ds3.secret-key=SECRETKEY
>> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
>> -Ds3.path-style-access=true -Ds3.ssl.enabled=false
>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> -Dhigh-availability.storageDir=s3://flink/flink-ha
>> local:///opt/flink/usrlib/job.jar
>>   restartPolicy: OnFailure
>>
>>
>> This works well for me but I would like to write the result to the
>> archive path and show it in the History server (running as separate
>> deployment in k8)
>>
>> Anytime it creates JobId= which obviously
>> leads to
>>
>> Caused by: java.util.concurrent.ExecutionException:
>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>> already been submitted.
>> at
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> ~[?:1.8.0_282]
>> at
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>> ~[?:1.8.0_282]
>> at
>> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
>> ~[?:?]
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_282]
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_282]
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_282]
>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> ... 10 more
>>
>> I assume it is because it will spawn a completely new cluster for each
>> run.
>>
>> Can I somehow set jobId or I'm trying to do something unsupported/bad?
>>
>> Thanks for advice.
>>
>> L.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html
>>

Re: Native kubernetes execution and History server

2021-03-25 Thread Guowei Ma
Hi,
After some discussion with Wang Yang offline, it seems that there might be
a jobmanager failover. So would you like to share full jobmanager log?
Best,
Guowei


On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal  wrote:

> Hi,
>
> I would like to use native kubernetes execution [1] for one batch job and
> let scheduling on kubernetes. Flink version: 1.12.2.
>
> Kubernetes job:
> apiVersion: batch/v1beta1
> kind: CronJob
> metadata:
>   name: scheduled-job
> spec:
>   schedule: "*/1 * * * *"
>   jobTemplate:
> spec:
>   template:
> metadata:
>   labels:
> app: super-flink-batch-job
> spec:
>   containers:
>   - name: runner
> image: localhost:5000/batch-flink-app-v3:latest
> imagePullPolicy: Always
> command:
>   - /bin/sh
>   - -c
>   - /opt/flink/bin/flink run-application --target
> kubernetes-application -Dkubernetes.service-account=flink-service-account
> -Dkubernetes.rest-service.exposed.type=NodePort
> -Dkubernetes.cluster-id=batch-job-cluster
> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
> -Ds3.secret-key=SECRETKEY
> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
> -Ds3.path-style-access=true -Ds3.ssl.enabled=false
> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> -Dhigh-availability.storageDir=s3://flink/flink-ha
> local:///opt/flink/usrlib/job.jar
>   restartPolicy: OnFailure
>
>
> This works well for me but I would like to write the result to the archive
> path and show it in the History server (running as separate deployment in
> k8)
>
> Anytime it creates JobId= which obviously
> leads to
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
> already been submitted.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> ~[?:1.8.0_282]
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> ~[?:1.8.0_282]
> at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
> ~[?:?]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_282]
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_282]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_282]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> ... 10 more
>
> I assume it is because it will spawn a completely new cluster for each run.
>
> Can I somehow set jobId or I'm trying to do something unsupported/bad?
>
> Thanks for advice.
>
> L.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html
>


Native kubernetes execution and History server

2021-03-24 Thread Lukáš Drbal
Hi,

I would like to use native kubernetes execution [1] for one batch job and
let scheduling on kubernetes. Flink version: 1.12.2.

Kubernetes job:
apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: scheduled-job
spec:
  schedule: "*/1 * * * *"
  jobTemplate:
spec:
  template:
metadata:
  labels:
app: super-flink-batch-job
spec:
  containers:
  - name: runner
image: localhost:5000/batch-flink-app-v3:latest
imagePullPolicy: Always
command:
  - /bin/sh
  - -c
  - /opt/flink/bin/flink run-application --target
kubernetes-application -Dkubernetes.service-account=flink-service-account
-Dkubernetes.rest-service.exposed.type=NodePort
-Dkubernetes.cluster-id=batch-job-cluster
-Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
-Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
-Ds3.secret-key=SECRETKEY
-Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
-Ds3.path-style-access=true -Ds3.ssl.enabled=false
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-Dhigh-availability.storageDir=s3://flink/flink-ha
local:///opt/flink/usrlib/job.jar
  restartPolicy: OnFailure


This works well for me but I would like to write the result to the archive
path and show it in the History server (running as separate deployment in
k8)

Anytime it creates JobId= which obviously
leads to

Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
already been submitted.
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
~[?:1.8.0_282]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
~[?:1.8.0_282]
at
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_282]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_282]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_282]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
~[flink-dist_2.12-1.12.2.jar:1.12.2]
... 10 more

I assume it is because it will spawn a completely new cluster for each run.

Can I somehow set jobId or I'm trying to do something unsupported/bad?

Thanks for advice.

L.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html