Re: [DISCUSS] Best practice to run flink on kubernetes

2019-09-29 Thread Yang Wang
Hi dev and users,

I just want to revive this discussion because we have some meaningful
progress about
kubernetes native integration. I have made a draft implementation to
complete the poc.
Cli and submission are both working as expected. The design doc[1] has been
updated,
including the detailed submission progress, the cli and yaml user interface
and the implementation
plan. All comments and suggestions are welcome.

BTW, we have made a speech at alibaba apsara conference last friday in "Big
Data Ecosystem"
session[2]. And we heard that many companies and users are planning to
migrate their big data
workloads to kubernetes cluster. Through mixed-run with online services,
they could get better
resource utilization and reduce the cost. Flink, as an important case, the
dynamical resource
allocation is the basic requirement. That's why we want to move the
progress more faster.


Best,
Yang

[1].
https://docs.google.com/document/d/1-jNzqGF6NfZuwVaFICoFQ5HFFXzF5NVIagUZByFMfBY/edit?usp=sharing
[2].
https://www.alibabacloud.com/zh/apsara-conference-2019?spm=a2c4e.11165380.1395221.13


Yang Wang  于2019年8月30日周五 下午2:23写道:

> Hi Zhenghua,
>
> You are right. For per-job cluster, the taskmanagers will be allocated
>
> dynamically by KubernetesResourceManager. For session cluster, we hope
>
> taskmangers could be pre-allocated even though it does not work now.
>
> Please navigate to the doc[1] for more details.
>
>
>
>
> Hi Thomas,
>
> We have no doubt that flink only need to support #1 and #3. For #1,
>
> we need external deployment management tools to make it in production.
>
> I also think kubernetes operator is good choice. It makes managing
> multiple
>
> flink jobs and long running streaming applications easier.
>
>
> Also in some companies, they have their own flink job management platform.
>
> Platform users submit flink job through webui. Update the flink
> configuration
>
> and restart the the job.
>
>
> For #3, we just want to make it possible to start flink job cluster and
> session
>
> cluster through cli. These users who used to run flink workloads on yarn
> are
>
> very convenient to migrate to kubernetes cluster. Compared to #1, the
> dynamic
>
> resource allocation is an important advantage. Maybe it could also be
> introduced
>
> to #1 in the future by some way.
>
>
>
>
> [1].
> https://docs.google.com/document/d/1-jNzqGF6NfZuwVaFICoFQ5HFFXzF5NVIagUZByFMfBY/edit?usp=sharing
>
> Thomas Weise  于2019年8月29日周四 下午10:24写道:
>
>> Till had already summed it up, but I want to emphasize that Flink as
>> project only needs to provide #1 (reactive mode) and #3 (active mode,
>> which
>> necessarily is tied to the cluster manager of choice). The latter would be
>> needed for Flink jobs to be elastic (in the future), although we may want
>> to discuss how such capability can be made easier with #1 as well.
>>
>> For users #1 alone is of little value, since they need to solve their
>> deployment problem. So it will be good to list options such as the Lyft
>> Flink k8s operator on the ecosystem page and possibly point to that from
>> the Flink documentation as well.
>>
>> I also want to point out that #3, while it looks easy to start with, has
>> an
>> important limitation when it comes to manage long running streaming
>> applications. Such application essentially will be a sequence of jobs that
>> come and go across stateful upgrades or rollbacks. Any solution that is
>> designed to manage a single Flink job instance can't address that need.
>> That is why the k8s operator was created. It specifically understands the
>> concept of an application.
>>
>> Thomas
>>
>>
>> On Wed, Aug 28, 2019 at 7:56 PM Zhenghua Gao  wrote:
>>
>> > Thanks Yang for bringing this up. I think option1 is very useful for
>> early
>> > adopters.
>> > People do not know much about k8s and can easily set up on minikube to
>> have
>> > a taste.
>> >
>> > For option2 and option3, i prefer option3 because i am familiar yarn and
>> > don't have much concept of k8s.
>> > And there is some doube about starting a session cluster in option3:
>> >
>> > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm
>> > flink-session-example
>> > > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT
>> >
>> > Is the -n option means number of TaskManager?
>> > Do we pre-running taskmanager pods or requesting and launching
>> taskmanager
>> > pods dynamically?
>> >
>> > *Best Regards,*
>> > *Zhenghua Gao*
>> >
>> >
>> > On Fri, Aug 9, 2019 at 9:12 PM Yang Wang  wrote:
>> >
>> > > Hi all,
>> > >
>> > > Currently cloud native architectures has been introduced to many
>> > companies
>> > > in production. They use kubernetes to run deep learning, web server,
>> etc.
>> > > If we could deploy the per-job/session flink cluster on kubernetes to
>> > make
>> > > it mix-run with other workloads, the cluster resource utilization
>> will be
>> > > better. Also many kubernetes users are more easier to have a taste on
>> the
>> > > flink.
>> > >
>> > > By now we have 

Re: [DISCUSS] Best practice to run flink on kubernetes

2019-08-30 Thread Yang Wang
Hi Zhenghua,

You are right. For per-job cluster, the taskmanagers will be allocated

dynamically by KubernetesResourceManager. For session cluster, we hope

taskmangers could be pre-allocated even though it does not work now.

Please navigate to the doc[1] for more details.




Hi Thomas,

We have no doubt that flink only need to support #1 and #3. For #1,

we need external deployment management tools to make it in production.

I also think kubernetes operator is good choice. It makes managing multiple

flink jobs and long running streaming applications easier.


Also in some companies, they have their own flink job management platform.

Platform users submit flink job through webui. Update the flink
configuration

and restart the the job.


For #3, we just want to make it possible to start flink job cluster and
session

cluster through cli. These users who used to run flink workloads on yarn
are

very convenient to migrate to kubernetes cluster. Compared to #1, the
dynamic

resource allocation is an important advantage. Maybe it could also be
introduced

to #1 in the future by some way.




[1].
https://docs.google.com/document/d/1-jNzqGF6NfZuwVaFICoFQ5HFFXzF5NVIagUZByFMfBY/edit?usp=sharing

Thomas Weise  于2019年8月29日周四 下午10:24写道:

> Till had already summed it up, but I want to emphasize that Flink as
> project only needs to provide #1 (reactive mode) and #3 (active mode, which
> necessarily is tied to the cluster manager of choice). The latter would be
> needed for Flink jobs to be elastic (in the future), although we may want
> to discuss how such capability can be made easier with #1 as well.
>
> For users #1 alone is of little value, since they need to solve their
> deployment problem. So it will be good to list options such as the Lyft
> Flink k8s operator on the ecosystem page and possibly point to that from
> the Flink documentation as well.
>
> I also want to point out that #3, while it looks easy to start with, has an
> important limitation when it comes to manage long running streaming
> applications. Such application essentially will be a sequence of jobs that
> come and go across stateful upgrades or rollbacks. Any solution that is
> designed to manage a single Flink job instance can't address that need.
> That is why the k8s operator was created. It specifically understands the
> concept of an application.
>
> Thomas
>
>
> On Wed, Aug 28, 2019 at 7:56 PM Zhenghua Gao  wrote:
>
> > Thanks Yang for bringing this up. I think option1 is very useful for
> early
> > adopters.
> > People do not know much about k8s and can easily set up on minikube to
> have
> > a taste.
> >
> > For option2 and option3, i prefer option3 because i am familiar yarn and
> > don't have much concept of k8s.
> > And there is some doube about starting a session cluster in option3:
> >
> > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm
> > flink-session-example
> > > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT
> >
> > Is the -n option means number of TaskManager?
> > Do we pre-running taskmanager pods or requesting and launching
> taskmanager
> > pods dynamically?
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
> >
> > On Fri, Aug 9, 2019 at 9:12 PM Yang Wang  wrote:
> >
> > > Hi all,
> > >
> > > Currently cloud native architectures has been introduced to many
> > companies
> > > in production. They use kubernetes to run deep learning, web server,
> etc.
> > > If we could deploy the per-job/session flink cluster on kubernetes to
> > make
> > > it mix-run with other workloads, the cluster resource utilization will
> be
> > > better. Also many kubernetes users are more easier to have a taste on
> the
> > > flink.
> > >
> > > By now we have three options to run flink jobs on k8s.
> > >
> > > [1]. Create jm/tm/service yaml and apply, then you will get a flink
> > > standalone cluster on k8s. Use flink run to submit job to the existed
> > flink
> > > cluster. Some companies may have their own deploy system to manage the
> > > flink cluster.
> > >
> > > [2]. Use flink-k8s-operator to manage multiple flink clusters,
> including
> > > session and perjob. It could manage the complete deployment lifecycle
> of
> > > the application. I think this option is really easy to use for the k8s
> > > users. They are familiar with k8s-opertor, kubectl and other tools of
> > k8s.
> > > They could debug and run the flink cluster just like other k8s
> > > applications.
> > >
> > > [3]. Natively integration with k8s, use the flink run or
> > > kubernetes-session.sh to start a flink cluster. It is very similar to
> > > submitting an flink cluster to Yarn. KubernetesClusterDescriptor talks
> to
> > > k8s api server to start a flink master deployment of 1.
> > > KubernetesResourceManager dynamically allocates resource from k8s to
> > start
> > > task manager as demand. This option is very easy for flink users to get
> > > started. In the simplest case, we just need to update the '-m
> > yarn-cluster'
> > > to -m '-m 

Re: [DISCUSS] Best practice to run flink on kubernetes

2019-08-29 Thread Thomas Weise
Till had already summed it up, but I want to emphasize that Flink as
project only needs to provide #1 (reactive mode) and #3 (active mode, which
necessarily is tied to the cluster manager of choice). The latter would be
needed for Flink jobs to be elastic (in the future), although we may want
to discuss how such capability can be made easier with #1 as well.

For users #1 alone is of little value, since they need to solve their
deployment problem. So it will be good to list options such as the Lyft
Flink k8s operator on the ecosystem page and possibly point to that from
the Flink documentation as well.

I also want to point out that #3, while it looks easy to start with, has an
important limitation when it comes to manage long running streaming
applications. Such application essentially will be a sequence of jobs that
come and go across stateful upgrades or rollbacks. Any solution that is
designed to manage a single Flink job instance can't address that need.
That is why the k8s operator was created. It specifically understands the
concept of an application.

Thomas


On Wed, Aug 28, 2019 at 7:56 PM Zhenghua Gao  wrote:

> Thanks Yang for bringing this up. I think option1 is very useful for early
> adopters.
> People do not know much about k8s and can easily set up on minikube to have
> a taste.
>
> For option2 and option3, i prefer option3 because i am familiar yarn and
> don't have much concept of k8s.
> And there is some doube about starting a session cluster in option3:
>
> > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm
> flink-session-example
> > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT
>
> Is the -n option means number of TaskManager?
> Do we pre-running taskmanager pods or requesting and launching taskmanager
> pods dynamically?
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Fri, Aug 9, 2019 at 9:12 PM Yang Wang  wrote:
>
> > Hi all,
> >
> > Currently cloud native architectures has been introduced to many
> companies
> > in production. They use kubernetes to run deep learning, web server, etc.
> > If we could deploy the per-job/session flink cluster on kubernetes to
> make
> > it mix-run with other workloads, the cluster resource utilization will be
> > better. Also many kubernetes users are more easier to have a taste on the
> > flink.
> >
> > By now we have three options to run flink jobs on k8s.
> >
> > [1]. Create jm/tm/service yaml and apply, then you will get a flink
> > standalone cluster on k8s. Use flink run to submit job to the existed
> flink
> > cluster. Some companies may have their own deploy system to manage the
> > flink cluster.
> >
> > [2]. Use flink-k8s-operator to manage multiple flink clusters, including
> > session and perjob. It could manage the complete deployment lifecycle of
> > the application. I think this option is really easy to use for the k8s
> > users. They are familiar with k8s-opertor, kubectl and other tools of
> k8s.
> > They could debug and run the flink cluster just like other k8s
> > applications.
> >
> > [3]. Natively integration with k8s, use the flink run or
> > kubernetes-session.sh to start a flink cluster. It is very similar to
> > submitting an flink cluster to Yarn. KubernetesClusterDescriptor talks to
> > k8s api server to start a flink master deployment of 1.
> > KubernetesResourceManager dynamically allocates resource from k8s to
> start
> > task manager as demand. This option is very easy for flink users to get
> > started. In the simplest case, we just need to update the '-m
> yarn-cluster'
> > to -m '-m kubernetes-cluster'.
> >
> > We have make an internal implementation of option [3] and use it in
> > production. After fully tested, we hope to contribute it to the
> community.
> > Now we want to get some feedbacks about the three options. Any comments
> are
> > welcome.
> >
> >
> > > What do we need to prepare when start a flink cluster on k8s using
> native
> > integration?
> >
> > Download the flink release binary and create the ~/.kube/config file
> > corresponding to the k8s cluster. It is all what you need.
> >
> >
> > > Flink Session cluster
> >
> > * start a session cluster
> >
> > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm
> flink-session-example
> > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT
> >
> > *  You will get an address to submit job, specify it through ’-ksa’
> option
> >
> > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-session-example
> > -ksa {x.x.x.x:12345} examples/streaming/WindowJoin.jar
> >
> >
> > > Flink Job Cluster
> >
> > * running with official flink image
> >
> > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1
> > -ki flink:latest examples/streaming/WindowJoin.jar
> >
> > * running with user image
> >
> > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1
> > -ki flink-user:latest examples/streaming/WindowJoin.jar
> >
> >
> >
> > [1].
> >
> >
> 

Re: [DISCUSS] Best practice to run flink on kubernetes

2019-08-28 Thread Zhenghua Gao
Thanks Yang for bringing this up. I think option1 is very useful for early
adopters.
People do not know much about k8s and can easily set up on minikube to have
a taste.

For option2 and option3, i prefer option3 because i am familiar yarn and
don't have much concept of k8s.
And there is some doube about starting a session cluster in option3:

> ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm flink-session-example
> -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT

Is the -n option means number of TaskManager?
Do we pre-running taskmanager pods or requesting and launching taskmanager
pods dynamically?

*Best Regards,*
*Zhenghua Gao*


On Fri, Aug 9, 2019 at 9:12 PM Yang Wang  wrote:

> Hi all,
>
> Currently cloud native architectures has been introduced to many companies
> in production. They use kubernetes to run deep learning, web server, etc.
> If we could deploy the per-job/session flink cluster on kubernetes to make
> it mix-run with other workloads, the cluster resource utilization will be
> better. Also many kubernetes users are more easier to have a taste on the
> flink.
>
> By now we have three options to run flink jobs on k8s.
>
> [1]. Create jm/tm/service yaml and apply, then you will get a flink
> standalone cluster on k8s. Use flink run to submit job to the existed flink
> cluster. Some companies may have their own deploy system to manage the
> flink cluster.
>
> [2]. Use flink-k8s-operator to manage multiple flink clusters, including
> session and perjob. It could manage the complete deployment lifecycle of
> the application. I think this option is really easy to use for the k8s
> users. They are familiar with k8s-opertor, kubectl and other tools of k8s.
> They could debug and run the flink cluster just like other k8s
> applications.
>
> [3]. Natively integration with k8s, use the flink run or
> kubernetes-session.sh to start a flink cluster. It is very similar to
> submitting an flink cluster to Yarn. KubernetesClusterDescriptor talks to
> k8s api server to start a flink master deployment of 1.
> KubernetesResourceManager dynamically allocates resource from k8s to start
> task manager as demand. This option is very easy for flink users to get
> started. In the simplest case, we just need to update the '-m yarn-cluster'
> to -m '-m kubernetes-cluster'.
>
> We have make an internal implementation of option [3] and use it in
> production. After fully tested, we hope to contribute it to the community.
> Now we want to get some feedbacks about the three options. Any comments are
> welcome.
>
>
> > What do we need to prepare when start a flink cluster on k8s using native
> integration?
>
> Download the flink release binary and create the ~/.kube/config file
> corresponding to the k8s cluster. It is all what you need.
>
>
> > Flink Session cluster
>
> * start a session cluster
>
> ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm flink-session-example
> -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT
>
> *  You will get an address to submit job, specify it through ’-ksa’ option
>
> ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-session-example
> -ksa {x.x.x.x:12345} examples/streaming/WindowJoin.jar
>
>
> > Flink Job Cluster
>
> * running with official flink image
>
> ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1
> -ki flink:latest examples/streaming/WindowJoin.jar
>
> * running with user image
>
> ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1
> -ki flink-user:latest examples/streaming/WindowJoin.jar
>
>
>
> [1].
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html
>
> [2].https://github.com/lyft/flinkk8soperator
>
> [3].
>
> https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit#
>


Re: [DISCUSS] Best practice to run flink on kubernetes

2019-08-14 Thread Yang Wang
Hi till,


Thanks for your reply. I agree with you that both option 1 and 3 need to be
supported.


Option 1 is reactive mode of resource management and flink is not aware of
underlying cluster. If a user has limited resources to run flink jobs, this
option will be very useful. On the other side, option 3 is active mode
resource management. Compared with option 1, the biggest advantage is that
we could allocate resource from k8s cluster on demand. Especially batch
jobs will benefit a lot from this.


I do not mean to abandon the proposal and Implementation in FLINK-9953.
Actually i have contacted with the assignee(Chunhui Shi) to help to review
and test the PRs. After all the basic Implementations have been merged, the
production features will be considered.



Best,

Yang

Till Rohrmann  于2019年8月13日周二 下午5:36写道:

> Hi Yang,
>
> thanks for reviving the discussion about Flink's Kubernetes integration. In
> a nutshell, I think that Flink should support option 1) and 3). Concretely,
> option 1) would be covered by the reactive mode [1] which is not
> necessarily bound to Kubernetes and works in all environments equally well.
> Option 3) is the native Kubernetes integration which is described in the
> design document. Actually, the discussion had been concluded already some
> time ago and there are already multiple PRs open for adding this feature
> [2]. So maybe you could check these PRs out and help the community
> reviewing and merging this code. Based on this we could then think about
> additions/improvements which are necessary.
>
> For option 2), I think a Kubernetes operator would be a good project for
> Flink's ecosystem website [3] and does not need to be necessarily part of
> Flink's repository.
>
> [1] https://issues.apache.org/jira/browse/FLINK-10407
> [2] https://issues.apache.org/jira/browse/FLINK-9953
> [3]
>
> https://lists.apache.org/thread.html/9b873f9dc1dd56d79e0f71418b123def896ed02f57e84461bc0bacb0@%3Cdev.flink.apache.org%3E
>
> Cheers,
> Till
>
> On Mon, Aug 12, 2019 at 5:46 AM Yang Wang  wrote:
>
> > Hi kaibo,
> >
> >
> > I am really appreciated that you could share your use case.
> >
> > As you say, our users in production also could be divided into two
> groups.
> > The common users have more knowledge about flink, they could use the
> > command line to submit job and debug job from logs of job manager and
> > taskmanager in the kubenetes. And for platform users, they use the yaml
> > config files or platform web to submit flink jobs.
> >
> > Regarding your comments:
> >
> > 1. Of course, the option 1(standalone on k8s) should always work as
> > expected. Users could submit the jm/tm/svc resource files to start a
> flink
> > cluster. The option 3(k8s native integration) will support both resource
> > files and command line submission. The resource file below is to create a
> > flink perjob cluster.
> >
> > apiVersion: extensions/v1beta1
> >
> > kind: Deployment
> >
> > metadata:
> >
> >   name: flink-word-count
> >
> > spec:
> >
> >   image: flink-wordcount:latest
> >
> >   flinkConfig:
> >
> > state.checkpoints.dir:
> > file:///checkpoints/flink/externalized-checkpoints
> >
> >   jobManagerConfig:
> >
> > resources:
> >
> >   requests:
> >
> > memory: “1024Mi"
> >
> > cpu: “1”
> >
> >   taskManagerConfig:
> >
> > taskSlots: 2
> >
> > resources:
> >
> >   requests:
> >
> > memory: “1024Mi"
> >
> > cpu: “1”
> >
> >   jobId: “”
> >
> >   parallelism: 3
> >
> >   jobClassName: "org.apache.flink.streaming.examples.wordcount.WordCount"
> >
> > 2. The ability to pass job-classname will be retained. The class should
> be
> > found in the classpath of taskmanager image. The flink per-job cluster
> > describe by yaml resource in section 1 could also be submitted by flink
> > command.
> >
> > flink run -m kubernetes-cluster -p 3 -knm flink-word-count -ki
> > flink-wordcount:latest -kjm 1024 -ktm 1024 -kD
> kubernetes.jobmanager.cpu=1
> > -kD kubernetes.taskmanager.cpu=1 -kjid 
> > -kjc org.apache.flink.streaming.examples.wordcount.WordCount -kD
> > state.checkpoints.dir= file:///checkpoints/flink/externalized-checkpoints
> >
> > 3. The job-id could also be specified by -kjid just like the command
> above.
> >
> > In a nutshell, the option 3 should have all the abilities in option 1.
> > Common users and platform users are all satisfied.
> >
> >
> >
> > Best,
> >
> > Yang
> >
> >
> > Kaibo Zhou  于2019年8月11日周日 下午1:23写道:
> >
> > > Thanks for bringing this up. Obviously, option 2 and 3 are both useful
> > for
> > > fink users on kubernetes. But option 3 is easy for users that not have
> > many
> > > concepts of kubernetes, they can start flink on kubernetes quickly, I
> > think
> > > it should have a higher priority.
> > >
> > > I have worked some time to integrate flink with our platform based on
> > > kubernetes, and have some concerns on option 3 from the platform user's
> > > 

Re: [DISCUSS] Best practice to run flink on kubernetes

2019-08-13 Thread Till Rohrmann
Hi Yang,

thanks for reviving the discussion about Flink's Kubernetes integration. In
a nutshell, I think that Flink should support option 1) and 3). Concretely,
option 1) would be covered by the reactive mode [1] which is not
necessarily bound to Kubernetes and works in all environments equally well.
Option 3) is the native Kubernetes integration which is described in the
design document. Actually, the discussion had been concluded already some
time ago and there are already multiple PRs open for adding this feature
[2]. So maybe you could check these PRs out and help the community
reviewing and merging this code. Based on this we could then think about
additions/improvements which are necessary.

For option 2), I think a Kubernetes operator would be a good project for
Flink's ecosystem website [3] and does not need to be necessarily part of
Flink's repository.

[1] https://issues.apache.org/jira/browse/FLINK-10407
[2] https://issues.apache.org/jira/browse/FLINK-9953
[3]
https://lists.apache.org/thread.html/9b873f9dc1dd56d79e0f71418b123def896ed02f57e84461bc0bacb0@%3Cdev.flink.apache.org%3E

Cheers,
Till

On Mon, Aug 12, 2019 at 5:46 AM Yang Wang  wrote:

> Hi kaibo,
>
>
> I am really appreciated that you could share your use case.
>
> As you say, our users in production also could be divided into two groups.
> The common users have more knowledge about flink, they could use the
> command line to submit job and debug job from logs of job manager and
> taskmanager in the kubenetes. And for platform users, they use the yaml
> config files or platform web to submit flink jobs.
>
> Regarding your comments:
>
> 1. Of course, the option 1(standalone on k8s) should always work as
> expected. Users could submit the jm/tm/svc resource files to start a flink
> cluster. The option 3(k8s native integration) will support both resource
> files and command line submission. The resource file below is to create a
> flink perjob cluster.
>
> apiVersion: extensions/v1beta1
>
> kind: Deployment
>
> metadata:
>
>   name: flink-word-count
>
> spec:
>
>   image: flink-wordcount:latest
>
>   flinkConfig:
>
> state.checkpoints.dir:
> file:///checkpoints/flink/externalized-checkpoints
>
>   jobManagerConfig:
>
> resources:
>
>   requests:
>
> memory: “1024Mi"
>
> cpu: “1”
>
>   taskManagerConfig:
>
> taskSlots: 2
>
> resources:
>
>   requests:
>
> memory: “1024Mi"
>
> cpu: “1”
>
>   jobId: “”
>
>   parallelism: 3
>
>   jobClassName: "org.apache.flink.streaming.examples.wordcount.WordCount"
>
> 2. The ability to pass job-classname will be retained. The class should be
> found in the classpath of taskmanager image. The flink per-job cluster
> describe by yaml resource in section 1 could also be submitted by flink
> command.
>
> flink run -m kubernetes-cluster -p 3 -knm flink-word-count -ki
> flink-wordcount:latest -kjm 1024 -ktm 1024 -kD kubernetes.jobmanager.cpu=1
> -kD kubernetes.taskmanager.cpu=1 -kjid 
> -kjc org.apache.flink.streaming.examples.wordcount.WordCount -kD
> state.checkpoints.dir= file:///checkpoints/flink/externalized-checkpoints
>
> 3. The job-id could also be specified by -kjid just like the command above.
>
> In a nutshell, the option 3 should have all the abilities in option 1.
> Common users and platform users are all satisfied.
>
>
>
> Best,
>
> Yang
>
>
> Kaibo Zhou  于2019年8月11日周日 下午1:23写道:
>
> > Thanks for bringing this up. Obviously, option 2 and 3 are both useful
> for
> > fink users on kubernetes. But option 3 is easy for users that not have
> many
> > concepts of kubernetes, they can start flink on kubernetes quickly, I
> think
> > it should have a higher priority.
> >
> > I have worked some time to integrate flink with our platform based on
> > kubernetes, and have some concerns on option 3 from the platform user's
> > perspective.
> >
> > First, I think users can be divided into common users and downstream
> > platform users.
> >
> > For common users, kubernetes-session.sh (or yarn-session.sh) is
> convenient
> > for them, just run shell scripts and get the jobmanager address. Then run
> > ./bin/flink to submit a job.
> >
> > But for the platform users, the shell scripts are not friendly to be
> > integrated. I need to use Java ProcessBuilder to run a shell script and
> > redirect the stdout/stderr. I need to parse the stdout log to get the
> > jobId, and need to process the exit code, and need to do some idempotence
> > logic to avoid duplicate jobs to be submitted.
> >
> > The way our platform integrates with flink on k8s is:
> > 1. Generate a job Id, and prepare
> jobmanager/taskmanager/service/configmap
> > resource files.
> > In the jobmanager and taskmanager resource file, we defined an
> > initContainer to download user jar from http/hdfs/s3..., so the user jar
> is
> > already on the jm and tm pod before they start. And
> > StandaloneJobClusterEntryPoint can accept "--job-id" 

Re: [DISCUSS] Best practice to run flink on kubernetes

2019-08-11 Thread Yang Wang
Hi kaibo,


I am really appreciated that you could share your use case.

As you say, our users in production also could be divided into two groups.
The common users have more knowledge about flink, they could use the
command line to submit job and debug job from logs of job manager and
taskmanager in the kubenetes. And for platform users, they use the yaml
config files or platform web to submit flink jobs.

Regarding your comments:

1. Of course, the option 1(standalone on k8s) should always work as
expected. Users could submit the jm/tm/svc resource files to start a flink
cluster. The option 3(k8s native integration) will support both resource
files and command line submission. The resource file below is to create a
flink perjob cluster.

apiVersion: extensions/v1beta1

kind: Deployment

metadata:

  name: flink-word-count

spec:

  image: flink-wordcount:latest

  flinkConfig:

state.checkpoints.dir:
file:///checkpoints/flink/externalized-checkpoints

  jobManagerConfig:

resources:

  requests:

memory: “1024Mi"

cpu: “1”

  taskManagerConfig:

taskSlots: 2

resources:

  requests:

memory: “1024Mi"

cpu: “1”

  jobId: “”

  parallelism: 3

  jobClassName: "org.apache.flink.streaming.examples.wordcount.WordCount"

2. The ability to pass job-classname will be retained. The class should be
found in the classpath of taskmanager image. The flink per-job cluster
describe by yaml resource in section 1 could also be submitted by flink
command.

flink run -m kubernetes-cluster -p 3 -knm flink-word-count -ki
flink-wordcount:latest -kjm 1024 -ktm 1024 -kD kubernetes.jobmanager.cpu=1
-kD kubernetes.taskmanager.cpu=1 -kjid 
-kjc org.apache.flink.streaming.examples.wordcount.WordCount -kD
state.checkpoints.dir= file:///checkpoints/flink/externalized-checkpoints

3. The job-id could also be specified by -kjid just like the command above.

In a nutshell, the option 3 should have all the abilities in option 1.
Common users and platform users are all satisfied.



Best,

Yang


Kaibo Zhou  于2019年8月11日周日 下午1:23写道:

> Thanks for bringing this up. Obviously, option 2 and 3 are both useful for
> fink users on kubernetes. But option 3 is easy for users that not have many
> concepts of kubernetes, they can start flink on kubernetes quickly, I think
> it should have a higher priority.
>
> I have worked some time to integrate flink with our platform based on
> kubernetes, and have some concerns on option 3 from the platform user's
> perspective.
>
> First, I think users can be divided into common users and downstream
> platform users.
>
> For common users, kubernetes-session.sh (or yarn-session.sh) is convenient
> for them, just run shell scripts and get the jobmanager address. Then run
> ./bin/flink to submit a job.
>
> But for the platform users, the shell scripts are not friendly to be
> integrated. I need to use Java ProcessBuilder to run a shell script and
> redirect the stdout/stderr. I need to parse the stdout log to get the
> jobId, and need to process the exit code, and need to do some idempotence
> logic to avoid duplicate jobs to be submitted.
>
> The way our platform integrates with flink on k8s is:
> 1. Generate a job Id, and prepare jobmanager/taskmanager/service/configmap
> resource files.
> In the jobmanager and taskmanager resource file, we defined an
> initContainer to download user jar from http/hdfs/s3..., so the user jar is
> already on the jm and tm pod before they start. And
> StandaloneJobClusterEntryPoint can accept "--job-id" to pass pre-generated
> jobId and accept "--job-classname" to pass user jar entry class and other
> args[1].
>
> 2. Submit resource files to k8s directly, and that is all. Not need other
> steps, e.g. upload/submit jar to flink, and k8s guarantee the idempotence
> natural, the same resources will be ignored.
>
> 3. Just use the pre-configured job id to query status, the platform knows
> the job id.
>
> The above steps are convenient for platform users. So my concern for option
> 3 is:
> 1. Besides to use kubernetes-session.sh to submit a job, can we retain the
> ability to let users submit k8s resources files directly, not forced to
> submit jobs from shell scripts. As you know, everything in kubernetes is a
> resource, submit a resource to kubernetes is more natural.
>
> 2. Retain the ability to pass job-classname to start Flink Job Cluster, so
> the platform users do not need a step to submit jar whether from
> ./bin/flink or from restful API.
> And for Flink Session Cluster, the platform uses can submit kubernetes
> resource files to start a session cluster, and then submit jar job from
> restful API to avoid call the shell scripts.
>
> 3. Retain the ability to pass job-id, It is not convenient and friendly to
> find which job id you have just submitted whether parse the submit log or
> query jobmanager restful API. And it is impossible to find the jobId in 

Re: [DISCUSS] Best practice to run flink on kubernetes

2019-08-11 Thread Yang Wang
Hi Jeff,

Thank you for your attention. You are right, the design doc is out of date.
I will try to contact with Till and Sun Jin to get the edit permission. And
then update the design doc.

Currently we get the k8s cluster informations from the kube config file.
You may have multiple contexts in your kube config file, just
set current-context to which you want to submit flink cluster to. Also i
think we could use a flink config option(api server address or context
name) to override the current-context.


Best,
Yang

Kaibo Zhou  于2019年8月11日周日 下午1:23写道:

> Thanks for bringing this up. Obviously, option 2 and 3 are both useful for
> fink users on kubernetes. But option 3 is easy for users that not have many
> concepts of kubernetes, they can start flink on kubernetes quickly, I think
> it should have a higher priority.
>
> I have worked some time to integrate flink with our platform based on
> kubernetes, and have some concerns on option 3 from the platform user's
> perspective.
>
> First, I think users can be divided into common users and downstream
> platform users.
>
> For common users, kubernetes-session.sh (or yarn-session.sh) is convenient
> for them, just run shell scripts and get the jobmanager address. Then run
> ./bin/flink to submit a job.
>
> But for the platform users, the shell scripts are not friendly to be
> integrated. I need to use Java ProcessBuilder to run a shell script and
> redirect the stdout/stderr. I need to parse the stdout log to get the
> jobId, and need to process the exit code, and need to do some idempotence
> logic to avoid duplicate jobs to be submitted.
>
> The way our platform integrates with flink on k8s is:
> 1. Generate a job Id, and prepare jobmanager/taskmanager/service/configmap
> resource files.
> In the jobmanager and taskmanager resource file, we defined an
> initContainer to download user jar from http/hdfs/s3..., so the user jar is
> already on the jm and tm pod before they start. And
> StandaloneJobClusterEntryPoint can accept "--job-id" to pass pre-generated
> jobId and accept "--job-classname" to pass user jar entry class and other
> args[1].
>
> 2. Submit resource files to k8s directly, and that is all. Not need other
> steps, e.g. upload/submit jar to flink, and k8s guarantee the idempotence
> natural, the same resources will be ignored.
>
> 3. Just use the pre-configured job id to query status, the platform knows
> the job id.
>
> The above steps are convenient for platform users. So my concern for option
> 3 is:
> 1. Besides to use kubernetes-session.sh to submit a job, can we retain the
> ability to let users submit k8s resources files directly, not forced to
> submit jobs from shell scripts. As you know, everything in kubernetes is a
> resource, submit a resource to kubernetes is more natural.
>
> 2. Retain the ability to pass job-classname to start Flink Job Cluster, so
> the platform users do not need a step to submit jar whether from
> ./bin/flink or from restful API.
> And for Flink Session Cluster, the platform uses can submit kubernetes
> resource files to start a session cluster, and then submit jar job from
> restful API to avoid call the shell scripts.
>
> 3. Retain the ability to pass job-id, It is not convenient and friendly to
> find which job id you have just submitted whether parse the submit log or
> query jobmanager restful API. And it is impossible to find the jobId in the
> session cluster scene, there will be many jobs with the same name and same
> submit time.
>
> I think it's better to retain these features already provided by the
> StandaloneJobClusterEntryPoint in option 3. This will make flink easier to
> be integrated with other platforms based on kubernetes.
>
> Thanks
> Kaibo
>
> [1].
>
> https://github.com/apache/flink/blob/master/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java#L45
>
> Jeff Zhang  于2019年8月10日周六 下午1:52写道:
>
> > Thanks Yang. K8s natively integration is very necessary and important for
> > the adoption of flink IMO.
> > I notice that the design doc is written in 2018, is there any changes or
> > update ?
> >
> > >>> Download the flink release binary and create the ~/.kube/config file
> > corresponding to the k8s cluster. It is all what you need.
> >
> > How can I specify which k8s cluster to run in case I have multiple k8s
> > clusters ? Can I do it via specifying flink cluster in flink cli ?
> >
> > Yang Wang  于2019年8月9日周五 下午9:12写道:
> >
> > > Hi all,
> > >
> > > Currently cloud native architectures has been introduced to many
> > companies
> > > in production. They use kubernetes to run deep learning, web server,
> etc.
> > > If we could deploy the per-job/session flink cluster on kubernetes to
> > make
> > > it mix-run with other workloads, the cluster resource utilization will
> be
> > > better. Also many kubernetes users are more easier to have a taste on
> the
> > > flink.
> > >
> > > By now we have three options to run flink jobs on 

Re: [DISCUSS] Best practice to run flink on kubernetes

2019-08-10 Thread Kaibo Zhou
Thanks for bringing this up. Obviously, option 2 and 3 are both useful for
fink users on kubernetes. But option 3 is easy for users that not have many
concepts of kubernetes, they can start flink on kubernetes quickly, I think
it should have a higher priority.

I have worked some time to integrate flink with our platform based on
kubernetes, and have some concerns on option 3 from the platform user's
perspective.

First, I think users can be divided into common users and downstream
platform users.

For common users, kubernetes-session.sh (or yarn-session.sh) is convenient
for them, just run shell scripts and get the jobmanager address. Then run
./bin/flink to submit a job.

But for the platform users, the shell scripts are not friendly to be
integrated. I need to use Java ProcessBuilder to run a shell script and
redirect the stdout/stderr. I need to parse the stdout log to get the
jobId, and need to process the exit code, and need to do some idempotence
logic to avoid duplicate jobs to be submitted.

The way our platform integrates with flink on k8s is:
1. Generate a job Id, and prepare jobmanager/taskmanager/service/configmap
resource files.
In the jobmanager and taskmanager resource file, we defined an
initContainer to download user jar from http/hdfs/s3..., so the user jar is
already on the jm and tm pod before they start. And
StandaloneJobClusterEntryPoint can accept "--job-id" to pass pre-generated
jobId and accept "--job-classname" to pass user jar entry class and other
args[1].

2. Submit resource files to k8s directly, and that is all. Not need other
steps, e.g. upload/submit jar to flink, and k8s guarantee the idempotence
natural, the same resources will be ignored.

3. Just use the pre-configured job id to query status, the platform knows
the job id.

The above steps are convenient for platform users. So my concern for option
3 is:
1. Besides to use kubernetes-session.sh to submit a job, can we retain the
ability to let users submit k8s resources files directly, not forced to
submit jobs from shell scripts. As you know, everything in kubernetes is a
resource, submit a resource to kubernetes is more natural.

2. Retain the ability to pass job-classname to start Flink Job Cluster, so
the platform users do not need a step to submit jar whether from
./bin/flink or from restful API.
And for Flink Session Cluster, the platform uses can submit kubernetes
resource files to start a session cluster, and then submit jar job from
restful API to avoid call the shell scripts.

3. Retain the ability to pass job-id, It is not convenient and friendly to
find which job id you have just submitted whether parse the submit log or
query jobmanager restful API. And it is impossible to find the jobId in the
session cluster scene, there will be many jobs with the same name and same
submit time.

I think it's better to retain these features already provided by the
StandaloneJobClusterEntryPoint in option 3. This will make flink easier to
be integrated with other platforms based on kubernetes.

Thanks
Kaibo

[1].
https://github.com/apache/flink/blob/master/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java#L45

Jeff Zhang  于2019年8月10日周六 下午1:52写道:

> Thanks Yang. K8s natively integration is very necessary and important for
> the adoption of flink IMO.
> I notice that the design doc is written in 2018, is there any changes or
> update ?
>
> >>> Download the flink release binary and create the ~/.kube/config file
> corresponding to the k8s cluster. It is all what you need.
>
> How can I specify which k8s cluster to run in case I have multiple k8s
> clusters ? Can I do it via specifying flink cluster in flink cli ?
>
> Yang Wang  于2019年8月9日周五 下午9:12写道:
>
> > Hi all,
> >
> > Currently cloud native architectures has been introduced to many
> companies
> > in production. They use kubernetes to run deep learning, web server, etc.
> > If we could deploy the per-job/session flink cluster on kubernetes to
> make
> > it mix-run with other workloads, the cluster resource utilization will be
> > better. Also many kubernetes users are more easier to have a taste on the
> > flink.
> >
> > By now we have three options to run flink jobs on k8s.
> >
> > [1]. Create jm/tm/service yaml and apply, then you will get a flink
> > standalone cluster on k8s. Use flink run to submit job to the existed
> flink
> > cluster. Some companies may have their own deploy system to manage the
> > flink cluster.
> >
> > [2]. Use flink-k8s-operator to manage multiple flink clusters, including
> > session and perjob. It could manage the complete deployment lifecycle of
> > the application. I think this option is really easy to use for the k8s
> > users. They are familiar with k8s-opertor, kubectl and other tools of
> k8s.
> > They could debug and run the flink cluster just like other k8s
> > applications.
> >
> > [3]. Natively integration with k8s, use the flink run or
> > kubernetes-session.sh 

Re: [DISCUSS] Best practice to run flink on kubernetes

2019-08-09 Thread Jeff Zhang
Thanks Yang. K8s natively integration is very necessary and important for
the adoption of flink IMO.
I notice that the design doc is written in 2018, is there any changes or
update ?

>>> Download the flink release binary and create the ~/.kube/config file
corresponding to the k8s cluster. It is all what you need.

How can I specify which k8s cluster to run in case I have multiple k8s
clusters ? Can I do it via specifying flink cluster in flink cli ?

Yang Wang  于2019年8月9日周五 下午9:12写道:

> Hi all,
>
> Currently cloud native architectures has been introduced to many companies
> in production. They use kubernetes to run deep learning, web server, etc.
> If we could deploy the per-job/session flink cluster on kubernetes to make
> it mix-run with other workloads, the cluster resource utilization will be
> better. Also many kubernetes users are more easier to have a taste on the
> flink.
>
> By now we have three options to run flink jobs on k8s.
>
> [1]. Create jm/tm/service yaml and apply, then you will get a flink
> standalone cluster on k8s. Use flink run to submit job to the existed flink
> cluster. Some companies may have their own deploy system to manage the
> flink cluster.
>
> [2]. Use flink-k8s-operator to manage multiple flink clusters, including
> session and perjob. It could manage the complete deployment lifecycle of
> the application. I think this option is really easy to use for the k8s
> users. They are familiar with k8s-opertor, kubectl and other tools of k8s.
> They could debug and run the flink cluster just like other k8s
> applications.
>
> [3]. Natively integration with k8s, use the flink run or
> kubernetes-session.sh to start a flink cluster. It is very similar to
> submitting an flink cluster to Yarn. KubernetesClusterDescriptor talks to
> k8s api server to start a flink master deployment of 1.
> KubernetesResourceManager dynamically allocates resource from k8s to start
> task manager as demand. This option is very easy for flink users to get
> started. In the simplest case, we just need to update the '-m yarn-cluster'
> to -m '-m kubernetes-cluster'.
>
> We have make an internal implementation of option [3] and use it in
> production. After fully tested, we hope to contribute it to the community.
> Now we want to get some feedbacks about the three options. Any comments are
> welcome.
>
>
> > What do we need to prepare when start a flink cluster on k8s using native
> integration?
>
> Download the flink release binary and create the ~/.kube/config file
> corresponding to the k8s cluster. It is all what you need.
>
>
> > Flink Session cluster
>
> * start a session cluster
>
> ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm flink-session-example
> -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT
>
> *  You will get an address to submit job, specify it through ’-ksa’ option
>
> ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-session-example
> -ksa {x.x.x.x:12345} examples/streaming/WindowJoin.jar
>
>
> > Flink Job Cluster
>
> * running with official flink image
>
> ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1
> -ki flink:latest examples/streaming/WindowJoin.jar
>
> * running with user image
>
> ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1
> -ki flink-user:latest examples/streaming/WindowJoin.jar
>
>
>
> [1].
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html
>
> [2].https://github.com/lyft/flinkk8soperator
>
> [3].
>
> https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit#
>


-- 
Best Regards

Jeff Zhang