Re: [DISCUSS] FLIP-271: Autoscaling

2022-12-15 Thread ConradJam
Thanks to Gyula and Max for a great start, I'll try this feature out and
I'll raise it on issue 


Maximilian Michels  于2022年12月15日周四 02:37写道:

> A heads-up: Gyula just opened a PR with the code contribution based on the
> design: https://github.com/apache/flink-kubernetes-operator/pull/484
>
> We have run some tests based on the current state and achieved very good
> results thus far. We were able to cut the resources of some of the
> deployments by 50% yielding very stable configurations for mostly static
> data rates. Also, we could achieve good scaling decisions on high-volume
> pipelines with fluctuating traffic which remained backlog free despite many
> adjustments due to the varying traffic.
>
> One of the most pressing issues we will have to solve is an integration
> with the K8s scheduler to upfront reserve resources to not hit any resource
> limits after scaling. Scaling currently redeploys the entire application
> which has some risks because we surrender the pods for each scaling. This
> can perhaps be achieved with the Rescale API.
>
> -Max
>
> On Sat, Nov 26, 2022 at 3:02 AM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
> > Thanks for the reply.
> >
> > Gyula and Max.
> >
> > Prasanna
> >
> >
> > On Sat, 26 Nov 2022, 00:24 Maximilian Michels,  wrote:
> >
> > > Hi John, hi Prasanna, hi Rui,
> > >
> > > Gyula already gave great answers to your questions, just adding to it:
> > >
> > > >What’s the reason to add auto scaling to the Operator instead of to
> the
> > > JobManager?
> > >
> > > As Gyula mentioned, the JobManager is not the ideal place, at least not
> > > until Flink supports in-place autoscaling which is a related but
> > ultimately
> > > very different problem because it involves solving job reconfiguration
> at
> > > runtime. I believe the AdaptiveScheduler has moved into this direction
> > and
> > > there is nothing preventing us from using it in the future once it has
> > > evolved further. For now, going through the job redeployment route
> seems
> > > like the easiest and safest way.
> > >
> > > >Could we finally use the autoscaler as a outside tool? or run it as a
> > > separate java process?
> > >
> > > I think we could but I wouldn't make it a requirement for the first
> > > version. There is nothing preventing the autoscaler from running as a
> > > separate k8s/yarn deployment which would provide some of the same
> > > availability guarantees as the operator or any deployment has on
> > k8s/yarn.
> > > However, I think this increases complexity by a fair bit because the
> > > operator already has all the configuration and tooling to manage Flink
> > > jobs. I'm not at all opposed to coming up with a way to allow the
> > > autoscaler to run separately as well as with the k8s operator. I just
> > think
> > > it is out of scope for the first version to keep the complexity and
> scope
> > > under control.
> > >
> > > -Max
> > >
> > >
> > > On Fri, Nov 25, 2022 at 8:16 AM Rui Fan <1996fan...@gmail.com> wrote:
> > >
> > > > Hi Gyula
> > > >
> > > > Thanks for the clarification!
> > > >
> > > > Best
> > > > Rui Fan
> > > >
> > > > On Fri, Nov 25, 2022 at 1:50 PM Gyula Fóra 
> > wrote:
> > > >
> > > > > Rui, Prasanna:
> > > > >
> > > > > I am afraid that creating a completely independent autoscaler
> process
> > > > that
> > > > > works with any type of Flink clusters is out of scope right now due
> > to
> > > > the
> > > > > following reasons:
> > > > >
> > > > > If we were to create a new general process, we would have to
> > implement
> > > > high
> > > > > availability and a pluggable mechanism to durably store metadata
> etc.
> > > The
> > > > > process itself would also have to run somewhere so we would have to
> > > > provide
> > > > > integrations.
> > > > >
> > > > >  It would also not be able to scale clusters easily without adding
> > > > > Kubernetes-operator-like functionality to it, and if the user has
> to
> > do
> > > > it
> > > > > manually most of the value is already lost.
> > > > >
> > > > > Last but not least this would have the potential of interfering
> with
> > > > other
> > > > > actions the user might be currently doing, making the autoscaler
> > itself
> > > > > complex and more unreliable.
> > > > >
> > > > > These are all prohibitive reasons at this point. We already have a
> > > > > prototype that tackle these smoothly as part of the Kubernetes
> > > operator.
> > > > >
> > > > > Instead of trying to put the autoscaler somewhere else we might
> also
> > > > > consider supporting different cluster types within the Kubernetes
> > > > operator.
> > > > > While that might sound silly at first, it is of similar scope to
> your
> > > > > suggestions and could help the problem.
> > > > >
> > > > > As for the new config question, we could collectively decide to
> > > backport
> > > > > this feature to enable the autoscaler as it is a very minor change.
> > > > >
> > > > > Gyula
> > > > >
> > > > > On Fri, 25 Nov 2022 at 06:21, John Roesler 
> > > 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-12-14 Thread Maximilian Michels
A heads-up: Gyula just opened a PR with the code contribution based on the
design: https://github.com/apache/flink-kubernetes-operator/pull/484

We have run some tests based on the current state and achieved very good
results thus far. We were able to cut the resources of some of the
deployments by 50% yielding very stable configurations for mostly static
data rates. Also, we could achieve good scaling decisions on high-volume
pipelines with fluctuating traffic which remained backlog free despite many
adjustments due to the varying traffic.

One of the most pressing issues we will have to solve is an integration
with the K8s scheduler to upfront reserve resources to not hit any resource
limits after scaling. Scaling currently redeploys the entire application
which has some risks because we surrender the pods for each scaling. This
can perhaps be achieved with the Rescale API.

-Max

On Sat, Nov 26, 2022 at 3:02 AM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Thanks for the reply.
>
> Gyula and Max.
>
> Prasanna
>
>
> On Sat, 26 Nov 2022, 00:24 Maximilian Michels,  wrote:
>
> > Hi John, hi Prasanna, hi Rui,
> >
> > Gyula already gave great answers to your questions, just adding to it:
> >
> > >What’s the reason to add auto scaling to the Operator instead of to the
> > JobManager?
> >
> > As Gyula mentioned, the JobManager is not the ideal place, at least not
> > until Flink supports in-place autoscaling which is a related but
> ultimately
> > very different problem because it involves solving job reconfiguration at
> > runtime. I believe the AdaptiveScheduler has moved into this direction
> and
> > there is nothing preventing us from using it in the future once it has
> > evolved further. For now, going through the job redeployment route seems
> > like the easiest and safest way.
> >
> > >Could we finally use the autoscaler as a outside tool? or run it as a
> > separate java process?
> >
> > I think we could but I wouldn't make it a requirement for the first
> > version. There is nothing preventing the autoscaler from running as a
> > separate k8s/yarn deployment which would provide some of the same
> > availability guarantees as the operator or any deployment has on
> k8s/yarn.
> > However, I think this increases complexity by a fair bit because the
> > operator already has all the configuration and tooling to manage Flink
> > jobs. I'm not at all opposed to coming up with a way to allow the
> > autoscaler to run separately as well as with the k8s operator. I just
> think
> > it is out of scope for the first version to keep the complexity and scope
> > under control.
> >
> > -Max
> >
> >
> > On Fri, Nov 25, 2022 at 8:16 AM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > > Hi Gyula
> > >
> > > Thanks for the clarification!
> > >
> > > Best
> > > Rui Fan
> > >
> > > On Fri, Nov 25, 2022 at 1:50 PM Gyula Fóra 
> wrote:
> > >
> > > > Rui, Prasanna:
> > > >
> > > > I am afraid that creating a completely independent autoscaler process
> > > that
> > > > works with any type of Flink clusters is out of scope right now due
> to
> > > the
> > > > following reasons:
> > > >
> > > > If we were to create a new general process, we would have to
> implement
> > > high
> > > > availability and a pluggable mechanism to durably store metadata etc.
> > The
> > > > process itself would also have to run somewhere so we would have to
> > > provide
> > > > integrations.
> > > >
> > > >  It would also not be able to scale clusters easily without adding
> > > > Kubernetes-operator-like functionality to it, and if the user has to
> do
> > > it
> > > > manually most of the value is already lost.
> > > >
> > > > Last but not least this would have the potential of interfering with
> > > other
> > > > actions the user might be currently doing, making the autoscaler
> itself
> > > > complex and more unreliable.
> > > >
> > > > These are all prohibitive reasons at this point. We already have a
> > > > prototype that tackle these smoothly as part of the Kubernetes
> > operator.
> > > >
> > > > Instead of trying to put the autoscaler somewhere else we might also
> > > > consider supporting different cluster types within the Kubernetes
> > > operator.
> > > > While that might sound silly at first, it is of similar scope to your
> > > > suggestions and could help the problem.
> > > >
> > > > As for the new config question, we could collectively decide to
> > backport
> > > > this feature to enable the autoscaler as it is a very minor change.
> > > >
> > > > Gyula
> > > >
> > > > On Fri, 25 Nov 2022 at 06:21, John Roesler 
> > wrote:
> > > >
> > > > > Thanks for this answer, Gyula!
> > > > > -John
> > > > >
> > > > > On Thu, Nov 24, 2022, at 14:53, Gyula Fóra wrote:
> > > > > > Hi John!
> > > > > >
> > > > > > Thank you for the excellent question.
> > > > > >
> > > > > > There are few reasons why we felt that the operator is the right
> > > place
> > > > > for
> > > > > > this component:
> > > > > >
> > > > > >  - Ideally the 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-25 Thread Prasanna kumar
Thanks for the reply.

Gyula and Max.

Prasanna


On Sat, 26 Nov 2022, 00:24 Maximilian Michels,  wrote:

> Hi John, hi Prasanna, hi Rui,
>
> Gyula already gave great answers to your questions, just adding to it:
>
> >What’s the reason to add auto scaling to the Operator instead of to the
> JobManager?
>
> As Gyula mentioned, the JobManager is not the ideal place, at least not
> until Flink supports in-place autoscaling which is a related but ultimately
> very different problem because it involves solving job reconfiguration at
> runtime. I believe the AdaptiveScheduler has moved into this direction and
> there is nothing preventing us from using it in the future once it has
> evolved further. For now, going through the job redeployment route seems
> like the easiest and safest way.
>
> >Could we finally use the autoscaler as a outside tool? or run it as a
> separate java process?
>
> I think we could but I wouldn't make it a requirement for the first
> version. There is nothing preventing the autoscaler from running as a
> separate k8s/yarn deployment which would provide some of the same
> availability guarantees as the operator or any deployment has on k8s/yarn.
> However, I think this increases complexity by a fair bit because the
> operator already has all the configuration and tooling to manage Flink
> jobs. I'm not at all opposed to coming up with a way to allow the
> autoscaler to run separately as well as with the k8s operator. I just think
> it is out of scope for the first version to keep the complexity and scope
> under control.
>
> -Max
>
>
> On Fri, Nov 25, 2022 at 8:16 AM Rui Fan <1996fan...@gmail.com> wrote:
>
> > Hi Gyula
> >
> > Thanks for the clarification!
> >
> > Best
> > Rui Fan
> >
> > On Fri, Nov 25, 2022 at 1:50 PM Gyula Fóra  wrote:
> >
> > > Rui, Prasanna:
> > >
> > > I am afraid that creating a completely independent autoscaler process
> > that
> > > works with any type of Flink clusters is out of scope right now due to
> > the
> > > following reasons:
> > >
> > > If we were to create a new general process, we would have to implement
> > high
> > > availability and a pluggable mechanism to durably store metadata etc.
> The
> > > process itself would also have to run somewhere so we would have to
> > provide
> > > integrations.
> > >
> > >  It would also not be able to scale clusters easily without adding
> > > Kubernetes-operator-like functionality to it, and if the user has to do
> > it
> > > manually most of the value is already lost.
> > >
> > > Last but not least this would have the potential of interfering with
> > other
> > > actions the user might be currently doing, making the autoscaler itself
> > > complex and more unreliable.
> > >
> > > These are all prohibitive reasons at this point. We already have a
> > > prototype that tackle these smoothly as part of the Kubernetes
> operator.
> > >
> > > Instead of trying to put the autoscaler somewhere else we might also
> > > consider supporting different cluster types within the Kubernetes
> > operator.
> > > While that might sound silly at first, it is of similar scope to your
> > > suggestions and could help the problem.
> > >
> > > As for the new config question, we could collectively decide to
> backport
> > > this feature to enable the autoscaler as it is a very minor change.
> > >
> > > Gyula
> > >
> > > On Fri, 25 Nov 2022 at 06:21, John Roesler 
> wrote:
> > >
> > > > Thanks for this answer, Gyula!
> > > > -John
> > > >
> > > > On Thu, Nov 24, 2022, at 14:53, Gyula Fóra wrote:
> > > > > Hi John!
> > > > >
> > > > > Thank you for the excellent question.
> > > > >
> > > > > There are few reasons why we felt that the operator is the right
> > place
> > > > for
> > > > > this component:
> > > > >
> > > > >  - Ideally the autoscaler is a separate process (an outside
> > observer) ,
> > > > and
> > > > > the jobmanager is very much tied to the lifecycle of the job. The
> > > > operator
> > > > > is a perfect example of such an external process that lives beyond
> > > > > individual jobs.
> > > > >  - Scaling itself might need some external resource management (for
> > > > > standalone clusters) that the jobmanager is not capable of, and the
> > > logic
> > > > > is already in the operator
> > > > > - Adding this to the operator allows us to integrate this fully in
> > the
> > > > > lifecycle management of the application. This guarantees that
> scaling
> > > > > decisions do not interfere with upgrades, suspends etc.
> > > > > - By adding it to the operator, the autoscaler can potentially work
> > on
> > > > > older Flink versions as well
> > > > > - The jobmanager is a component designed to handle Flink individual
> > > jobs,
> > > > > but the autoscaler component needs to work on a higher abstraction
> > > layer
> > > > to
> > > > > be able to integrate with user job upgrades etc.
> > > > >
> > > > > These are some of the main things that come to my mind :)
> > > > >
> > > > > Having it in the operator ties this logic to 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-25 Thread Maximilian Michels
Hi John, hi Prasanna, hi Rui,

Gyula already gave great answers to your questions, just adding to it:

>What’s the reason to add auto scaling to the Operator instead of to the
JobManager?

As Gyula mentioned, the JobManager is not the ideal place, at least not
until Flink supports in-place autoscaling which is a related but ultimately
very different problem because it involves solving job reconfiguration at
runtime. I believe the AdaptiveScheduler has moved into this direction and
there is nothing preventing us from using it in the future once it has
evolved further. For now, going through the job redeployment route seems
like the easiest and safest way.

>Could we finally use the autoscaler as a outside tool? or run it as a
separate java process?

I think we could but I wouldn't make it a requirement for the first
version. There is nothing preventing the autoscaler from running as a
separate k8s/yarn deployment which would provide some of the same
availability guarantees as the operator or any deployment has on k8s/yarn.
However, I think this increases complexity by a fair bit because the
operator already has all the configuration and tooling to manage Flink
jobs. I'm not at all opposed to coming up with a way to allow the
autoscaler to run separately as well as with the k8s operator. I just think
it is out of scope for the first version to keep the complexity and scope
under control.

-Max


On Fri, Nov 25, 2022 at 8:16 AM Rui Fan <1996fan...@gmail.com> wrote:

> Hi Gyula
>
> Thanks for the clarification!
>
> Best
> Rui Fan
>
> On Fri, Nov 25, 2022 at 1:50 PM Gyula Fóra  wrote:
>
> > Rui, Prasanna:
> >
> > I am afraid that creating a completely independent autoscaler process
> that
> > works with any type of Flink clusters is out of scope right now due to
> the
> > following reasons:
> >
> > If we were to create a new general process, we would have to implement
> high
> > availability and a pluggable mechanism to durably store metadata etc. The
> > process itself would also have to run somewhere so we would have to
> provide
> > integrations.
> >
> >  It would also not be able to scale clusters easily without adding
> > Kubernetes-operator-like functionality to it, and if the user has to do
> it
> > manually most of the value is already lost.
> >
> > Last but not least this would have the potential of interfering with
> other
> > actions the user might be currently doing, making the autoscaler itself
> > complex and more unreliable.
> >
> > These are all prohibitive reasons at this point. We already have a
> > prototype that tackle these smoothly as part of the Kubernetes operator.
> >
> > Instead of trying to put the autoscaler somewhere else we might also
> > consider supporting different cluster types within the Kubernetes
> operator.
> > While that might sound silly at first, it is of similar scope to your
> > suggestions and could help the problem.
> >
> > As for the new config question, we could collectively decide to backport
> > this feature to enable the autoscaler as it is a very minor change.
> >
> > Gyula
> >
> > On Fri, 25 Nov 2022 at 06:21, John Roesler  wrote:
> >
> > > Thanks for this answer, Gyula!
> > > -John
> > >
> > > On Thu, Nov 24, 2022, at 14:53, Gyula Fóra wrote:
> > > > Hi John!
> > > >
> > > > Thank you for the excellent question.
> > > >
> > > > There are few reasons why we felt that the operator is the right
> place
> > > for
> > > > this component:
> > > >
> > > >  - Ideally the autoscaler is a separate process (an outside
> observer) ,
> > > and
> > > > the jobmanager is very much tied to the lifecycle of the job. The
> > > operator
> > > > is a perfect example of such an external process that lives beyond
> > > > individual jobs.
> > > >  - Scaling itself might need some external resource management (for
> > > > standalone clusters) that the jobmanager is not capable of, and the
> > logic
> > > > is already in the operator
> > > > - Adding this to the operator allows us to integrate this fully in
> the
> > > > lifecycle management of the application. This guarantees that scaling
> > > > decisions do not interfere with upgrades, suspends etc.
> > > > - By adding it to the operator, the autoscaler can potentially work
> on
> > > > older Flink versions as well
> > > > - The jobmanager is a component designed to handle Flink individual
> > jobs,
> > > > but the autoscaler component needs to work on a higher abstraction
> > layer
> > > to
> > > > be able to integrate with user job upgrades etc.
> > > >
> > > > These are some of the main things that come to my mind :)
> > > >
> > > > Having it in the operator ties this logic to Kubernetes itself but we
> > > feel
> > > > that an autoscaler is mostly relevant in an elastic cloud environment
> > > > anyways.
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > > > On Thu, Nov 24, 2022 at 9:40 PM John Roesler 
> > > wrote:
> > > >
> > > >> Hi Max,
> > > >>
> > > >> Thanks for the FLIP!
> > > >>
> > > >> I’ve been curious about one 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-24 Thread Rui Fan
Hi Gyula

Thanks for the clarification!

Best
Rui Fan

On Fri, Nov 25, 2022 at 1:50 PM Gyula Fóra  wrote:

> Rui, Prasanna:
>
> I am afraid that creating a completely independent autoscaler process that
> works with any type of Flink clusters is out of scope right now due to the
> following reasons:
>
> If we were to create a new general process, we would have to implement high
> availability and a pluggable mechanism to durably store metadata etc. The
> process itself would also have to run somewhere so we would have to provide
> integrations.
>
>  It would also not be able to scale clusters easily without adding
> Kubernetes-operator-like functionality to it, and if the user has to do it
> manually most of the value is already lost.
>
> Last but not least this would have the potential of interfering with other
> actions the user might be currently doing, making the autoscaler itself
> complex and more unreliable.
>
> These are all prohibitive reasons at this point. We already have a
> prototype that tackle these smoothly as part of the Kubernetes operator.
>
> Instead of trying to put the autoscaler somewhere else we might also
> consider supporting different cluster types within the Kubernetes operator.
> While that might sound silly at first, it is of similar scope to your
> suggestions and could help the problem.
>
> As for the new config question, we could collectively decide to backport
> this feature to enable the autoscaler as it is a very minor change.
>
> Gyula
>
> On Fri, 25 Nov 2022 at 06:21, John Roesler  wrote:
>
> > Thanks for this answer, Gyula!
> > -John
> >
> > On Thu, Nov 24, 2022, at 14:53, Gyula Fóra wrote:
> > > Hi John!
> > >
> > > Thank you for the excellent question.
> > >
> > > There are few reasons why we felt that the operator is the right place
> > for
> > > this component:
> > >
> > >  - Ideally the autoscaler is a separate process (an outside observer) ,
> > and
> > > the jobmanager is very much tied to the lifecycle of the job. The
> > operator
> > > is a perfect example of such an external process that lives beyond
> > > individual jobs.
> > >  - Scaling itself might need some external resource management (for
> > > standalone clusters) that the jobmanager is not capable of, and the
> logic
> > > is already in the operator
> > > - Adding this to the operator allows us to integrate this fully in the
> > > lifecycle management of the application. This guarantees that scaling
> > > decisions do not interfere with upgrades, suspends etc.
> > > - By adding it to the operator, the autoscaler can potentially work on
> > > older Flink versions as well
> > > - The jobmanager is a component designed to handle Flink individual
> jobs,
> > > but the autoscaler component needs to work on a higher abstraction
> layer
> > to
> > > be able to integrate with user job upgrades etc.
> > >
> > > These are some of the main things that come to my mind :)
> > >
> > > Having it in the operator ties this logic to Kubernetes itself but we
> > feel
> > > that an autoscaler is mostly relevant in an elastic cloud environment
> > > anyways.
> > >
> > > Cheers,
> > > Gyula
> > >
> > > On Thu, Nov 24, 2022 at 9:40 PM John Roesler 
> > wrote:
> > >
> > >> Hi Max,
> > >>
> > >> Thanks for the FLIP!
> > >>
> > >> I’ve been curious about one one point. I can imagine some good reasons
> > for
> > >> it but wonder what you have in mind. What’s the reason to add auto
> > scaling
> > >> to the Operator instead of to the JobManager?
> > >>
> > >> It seems like adding that capability to the JobManager would be a
> bigger
> > >> project, but it also would create some interesting opportunities.
> > >>
> > >> This is certainly not a suggestion, just a question.
> > >>
> > >> Thanks!
> > >> John
> > >>
> > >> On Wed, Nov 23, 2022, at 10:12, Maximilian Michels wrote:
> > >> > Thanks for your comments @Dong and @Chen. It is true that not all
> the
> > >> > details are contained in the FLIP. The document is meant as a
> general
> > >> > design concept.
> > >> >
> > >> > As for the rescaling time, this is going to be a configurable
> setting
> > for
> > >> > now but it is foreseeable that we will provide auto-tuning of this
> > >> > configuration value by observing the job restart time. Same goes for
> > the
> > >> > scaling decision itself which can learn from previous decisions. But
> > we
> > >> > want to keep it simple for the first version.
> > >> >
> > >> > For sources that do not support the pendingRecords metric, we are
> > >> planning
> > >> > to either give the user the choice to set a manual target rate, or
> > scale
> > >> it
> > >> > purely based on its utilization as reported via busyTimeMsPerSecond.
> > In
> > >> > case of legacy sources, we will skip scaling these branches entirely
> > >> > because they support neither of these metrics.
> > >> >
> > >> > -Max
> > >> >
> > >> > On Mon, Nov 21, 2022 at 11:27 AM Maximilian Michels  >
> > >> wrote:
> > >> >
> > >> >> >Do we think the scaler could be a plugin 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-24 Thread Gyula Fóra
Rui, Prasanna:

I am afraid that creating a completely independent autoscaler process that
works with any type of Flink clusters is out of scope right now due to the
following reasons:

If we were to create a new general process, we would have to implement high
availability and a pluggable mechanism to durably store metadata etc. The
process itself would also have to run somewhere so we would have to provide
integrations.

 It would also not be able to scale clusters easily without adding
Kubernetes-operator-like functionality to it, and if the user has to do it
manually most of the value is already lost.

Last but not least this would have the potential of interfering with other
actions the user might be currently doing, making the autoscaler itself
complex and more unreliable.

These are all prohibitive reasons at this point. We already have a
prototype that tackle these smoothly as part of the Kubernetes operator.

Instead of trying to put the autoscaler somewhere else we might also
consider supporting different cluster types within the Kubernetes operator.
While that might sound silly at first, it is of similar scope to your
suggestions and could help the problem.

As for the new config question, we could collectively decide to backport
this feature to enable the autoscaler as it is a very minor change.

Gyula

On Fri, 25 Nov 2022 at 06:21, John Roesler  wrote:

> Thanks for this answer, Gyula!
> -John
>
> On Thu, Nov 24, 2022, at 14:53, Gyula Fóra wrote:
> > Hi John!
> >
> > Thank you for the excellent question.
> >
> > There are few reasons why we felt that the operator is the right place
> for
> > this component:
> >
> >  - Ideally the autoscaler is a separate process (an outside observer) ,
> and
> > the jobmanager is very much tied to the lifecycle of the job. The
> operator
> > is a perfect example of such an external process that lives beyond
> > individual jobs.
> >  - Scaling itself might need some external resource management (for
> > standalone clusters) that the jobmanager is not capable of, and the logic
> > is already in the operator
> > - Adding this to the operator allows us to integrate this fully in the
> > lifecycle management of the application. This guarantees that scaling
> > decisions do not interfere with upgrades, suspends etc.
> > - By adding it to the operator, the autoscaler can potentially work on
> > older Flink versions as well
> > - The jobmanager is a component designed to handle Flink individual jobs,
> > but the autoscaler component needs to work on a higher abstraction layer
> to
> > be able to integrate with user job upgrades etc.
> >
> > These are some of the main things that come to my mind :)
> >
> > Having it in the operator ties this logic to Kubernetes itself but we
> feel
> > that an autoscaler is mostly relevant in an elastic cloud environment
> > anyways.
> >
> > Cheers,
> > Gyula
> >
> > On Thu, Nov 24, 2022 at 9:40 PM John Roesler 
> wrote:
> >
> >> Hi Max,
> >>
> >> Thanks for the FLIP!
> >>
> >> I’ve been curious about one one point. I can imagine some good reasons
> for
> >> it but wonder what you have in mind. What’s the reason to add auto
> scaling
> >> to the Operator instead of to the JobManager?
> >>
> >> It seems like adding that capability to the JobManager would be a bigger
> >> project, but it also would create some interesting opportunities.
> >>
> >> This is certainly not a suggestion, just a question.
> >>
> >> Thanks!
> >> John
> >>
> >> On Wed, Nov 23, 2022, at 10:12, Maximilian Michels wrote:
> >> > Thanks for your comments @Dong and @Chen. It is true that not all the
> >> > details are contained in the FLIP. The document is meant as a general
> >> > design concept.
> >> >
> >> > As for the rescaling time, this is going to be a configurable setting
> for
> >> > now but it is foreseeable that we will provide auto-tuning of this
> >> > configuration value by observing the job restart time. Same goes for
> the
> >> > scaling decision itself which can learn from previous decisions. But
> we
> >> > want to keep it simple for the first version.
> >> >
> >> > For sources that do not support the pendingRecords metric, we are
> >> planning
> >> > to either give the user the choice to set a manual target rate, or
> scale
> >> it
> >> > purely based on its utilization as reported via busyTimeMsPerSecond.
> In
> >> > case of legacy sources, we will skip scaling these branches entirely
> >> > because they support neither of these metrics.
> >> >
> >> > -Max
> >> >
> >> > On Mon, Nov 21, 2022 at 11:27 AM Maximilian Michels 
> >> wrote:
> >> >
> >> >> >Do we think the scaler could be a plugin or hard coded ?
> >> >>
> >> >> +1 For pluggable scaling logic.
> >> >>
> >> >> On Mon, Nov 21, 2022 at 3:38 AM Chen Qin  wrote:
> >> >>
> >> >>> On Sun, Nov 20, 2022 at 7:25 AM Gyula Fóra 
> >> wrote:
> >> >>>
> >> >>> > Hi Chen!
> >> >>> >
> >> >>> > I think in the long term it makes sense to provide some pluggable
> >> >>> > mechanisms but it's not completely 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-24 Thread John Roesler
Thanks for this answer, Gyula!
-John

On Thu, Nov 24, 2022, at 14:53, Gyula Fóra wrote:
> Hi John!
>
> Thank you for the excellent question.
>
> There are few reasons why we felt that the operator is the right place for
> this component:
>
>  - Ideally the autoscaler is a separate process (an outside observer) , and
> the jobmanager is very much tied to the lifecycle of the job. The operator
> is a perfect example of such an external process that lives beyond
> individual jobs.
>  - Scaling itself might need some external resource management (for
> standalone clusters) that the jobmanager is not capable of, and the logic
> is already in the operator
> - Adding this to the operator allows us to integrate this fully in the
> lifecycle management of the application. This guarantees that scaling
> decisions do not interfere with upgrades, suspends etc.
> - By adding it to the operator, the autoscaler can potentially work on
> older Flink versions as well
> - The jobmanager is a component designed to handle Flink individual jobs,
> but the autoscaler component needs to work on a higher abstraction layer to
> be able to integrate with user job upgrades etc.
>
> These are some of the main things that come to my mind :)
>
> Having it in the operator ties this logic to Kubernetes itself but we feel
> that an autoscaler is mostly relevant in an elastic cloud environment
> anyways.
>
> Cheers,
> Gyula
>
> On Thu, Nov 24, 2022 at 9:40 PM John Roesler  wrote:
>
>> Hi Max,
>>
>> Thanks for the FLIP!
>>
>> I’ve been curious about one one point. I can imagine some good reasons for
>> it but wonder what you have in mind. What’s the reason to add auto scaling
>> to the Operator instead of to the JobManager?
>>
>> It seems like adding that capability to the JobManager would be a bigger
>> project, but it also would create some interesting opportunities.
>>
>> This is certainly not a suggestion, just a question.
>>
>> Thanks!
>> John
>>
>> On Wed, Nov 23, 2022, at 10:12, Maximilian Michels wrote:
>> > Thanks for your comments @Dong and @Chen. It is true that not all the
>> > details are contained in the FLIP. The document is meant as a general
>> > design concept.
>> >
>> > As for the rescaling time, this is going to be a configurable setting for
>> > now but it is foreseeable that we will provide auto-tuning of this
>> > configuration value by observing the job restart time. Same goes for the
>> > scaling decision itself which can learn from previous decisions. But we
>> > want to keep it simple for the first version.
>> >
>> > For sources that do not support the pendingRecords metric, we are
>> planning
>> > to either give the user the choice to set a manual target rate, or scale
>> it
>> > purely based on its utilization as reported via busyTimeMsPerSecond. In
>> > case of legacy sources, we will skip scaling these branches entirely
>> > because they support neither of these metrics.
>> >
>> > -Max
>> >
>> > On Mon, Nov 21, 2022 at 11:27 AM Maximilian Michels 
>> wrote:
>> >
>> >> >Do we think the scaler could be a plugin or hard coded ?
>> >>
>> >> +1 For pluggable scaling logic.
>> >>
>> >> On Mon, Nov 21, 2022 at 3:38 AM Chen Qin  wrote:
>> >>
>> >>> On Sun, Nov 20, 2022 at 7:25 AM Gyula Fóra 
>> wrote:
>> >>>
>> >>> > Hi Chen!
>> >>> >
>> >>> > I think in the long term it makes sense to provide some pluggable
>> >>> > mechanisms but it's not completely trivial where exactly you would
>> plug
>> >>> in
>> >>> > your custom logic at this point.
>> >>> >
>> >>> sounds good, more specifically would be great if it can accept input
>> >>> features
>> >>> (including previous scaling decisions) and output decisions.
>> >>> Folks might keep their own secret sauce and avoid patching oss fork.
>> >>>
>> >>> >
>> >>> > In any case the problems you mentioned should be solved robustly by
>> the
>> >>> > algorithm itself without any customization:
>> >>> >  - We need to be able to detect ineffective scaling decisions, let\s
>> >>> say we
>> >>> > scaled up (expecting better throughput with a higher parallelism)
>> but we
>> >>> > did not get a better processing capacity (this would be the external
>> >>> > service bottleneck)
>> >>> >
>> >>> sounds good, so we would at least try restart job once (optimistic
>> path)
>> >>> as
>> >>> design choice.
>> >>>
>> >>> >  - We are evaluating metrics in windows, and we have some flexible
>> >>> > boundaries to avoid scaling on minor load spikes
>> >>> >
>> >>> yes, would be great if user can feed in throughput changes over
>> different
>> >>> time buckets (last 10s, 30s, 1 min,5 mins) as input features
>> >>>
>> >>> >
>> >>> > Regards,
>> >>> > Gyula
>> >>> >
>> >>> > On Sun, Nov 20, 2022 at 12:28 AM Chen Qin 
>> wrote:
>> >>> >
>> >>> > > Hi Gyula,
>> >>> > >
>> >>> > > Do we think the scaler could be a plugin or hard coded ?
>> >>> > > We observed some cases scaler can't address (e.g async io
>> dependency
>> >>> > > service degradation or small spike that doesn't worth restarting
>> job)

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-24 Thread Prasanna kumar
HI max,

This is a great initiative and good discussion going on.

We have set up flink cluster using Amazon ECS . So It would be good to
design in such a way that we can deploy the autoscaler in a separate docker
image which could observe the JM, JOBS and emit outputs that can use to
trigger the ECS to add new/delete existing TMs on request.

Thanks,
Prasanna.

On Fri, 25 Nov 2022, 07:39 Rui Fan, <1996fan...@gmail.com> wrote:

> Hi Gyula, Max, John!
>
> Thanks for the great FLIP, it's very useful for flink users.
>
> > Ideally the autoscaler is a separate process (an outside observer)
>
> Could we finally use the autoscaler as a outside tool? or run it as a
> separate java process? If it's complex, can the part that detects
>  the job and suggests parallelism be a separate java process?
>
> Since a large number of Flink jobs are still using Flink on yarn,
> this feature is also useful for them. I was wondering if some logs
> or advice can be provided if automatic scala is not working for
> Flink on yarn. For example: the parallelism suggested by
> vertex_1 is 100, and the parallelism suggested by vertex_2 is 150.
>
> With this information, the flink user can manually set
> reasonable parallelism. Or some flink platforms can integrate
> this tool and use `pipeline.jobvertex-parallelism-overrides`[1]
> to make autoscaler work on Flink on yarn.
>
> > By adding it to the operator, the autoscaler can potentially work on
> > older Flink versions as well
>
> As I understand, `pipeline.jobvertex-parallelism-overrides`[1]
> is supported in Flink 1.17, so old flink versions can only detect,
> not auto scala, right?
>
> [1] https://issues.apache.org/jira/browse/FLINK-29501
>
> Best
> Rui Fan
>
>
> On Fri, Nov 25, 2022 at 4:54 AM Gyula Fóra  wrote:
>
> > Hi John!
> >
> > Thank you for the excellent question.
> >
> > There are few reasons why we felt that the operator is the right place
> for
> > this component:
> >
> >  - Ideally the autoscaler is a separate process (an outside observer) ,
> and
> > the jobmanager is very much tied to the lifecycle of the job. The
> operator
> > is a perfect example of such an external process that lives beyond
> > individual jobs.
> >  - Scaling itself might need some external resource management (for
> > standalone clusters) that the jobmanager is not capable of, and the logic
> > is already in the operator
> > - Adding this to the operator allows us to integrate this fully in the
> > lifecycle management of the application. This guarantees that scaling
> > decisions do not interfere with upgrades, suspends etc.
> > - By adding it to the operator, the autoscaler can potentially work on
> > older Flink versions as well
> > - The jobmanager is a component designed to handle Flink individual jobs,
> > but the autoscaler component needs to work on a higher abstraction layer
> to
> > be able to integrate with user job upgrades etc.
> >
> > These are some of the main things that come to my mind :)
> >
> > Having it in the operator ties this logic to Kubernetes itself but we
> feel
> > that an autoscaler is mostly relevant in an elastic cloud environment
> > anyways.
> >
> > Cheers,
> > Gyula
> >
> > On Thu, Nov 24, 2022 at 9:40 PM John Roesler 
> wrote:
> >
> > > Hi Max,
> > >
> > > Thanks for the FLIP!
> > >
> > > I’ve been curious about one one point. I can imagine some good reasons
> > for
> > > it but wonder what you have in mind. What’s the reason to add auto
> > scaling
> > > to the Operator instead of to the JobManager?
> > >
> > > It seems like adding that capability to the JobManager would be a
> bigger
> > > project, but it also would create some interesting opportunities.
> > >
> > > This is certainly not a suggestion, just a question.
> > >
> > > Thanks!
> > > John
> > >
> > > On Wed, Nov 23, 2022, at 10:12, Maximilian Michels wrote:
> > > > Thanks for your comments @Dong and @Chen. It is true that not all the
> > > > details are contained in the FLIP. The document is meant as a general
> > > > design concept.
> > > >
> > > > As for the rescaling time, this is going to be a configurable setting
> > for
> > > > now but it is foreseeable that we will provide auto-tuning of this
> > > > configuration value by observing the job restart time. Same goes for
> > the
> > > > scaling decision itself which can learn from previous decisions. But
> we
> > > > want to keep it simple for the first version.
> > > >
> > > > For sources that do not support the pendingRecords metric, we are
> > > planning
> > > > to either give the user the choice to set a manual target rate, or
> > scale
> > > it
> > > > purely based on its utilization as reported via busyTimeMsPerSecond.
> In
> > > > case of legacy sources, we will skip scaling these branches entirely
> > > > because they support neither of these metrics.
> > > >
> > > > -Max
> > > >
> > > > On Mon, Nov 21, 2022 at 11:27 AM Maximilian Michels 
> > > wrote:
> > > >
> > > >> >Do we think the scaler could be a plugin or hard coded ?
> > > >>
> 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-24 Thread Rui Fan
Hi Gyula, Max, John!

Thanks for the great FLIP, it's very useful for flink users.

> Ideally the autoscaler is a separate process (an outside observer)

Could we finally use the autoscaler as a outside tool? or run it as a
separate java process? If it's complex, can the part that detects
 the job and suggests parallelism be a separate java process?

Since a large number of Flink jobs are still using Flink on yarn,
this feature is also useful for them. I was wondering if some logs
or advice can be provided if automatic scala is not working for
Flink on yarn. For example: the parallelism suggested by
vertex_1 is 100, and the parallelism suggested by vertex_2 is 150.

With this information, the flink user can manually set
reasonable parallelism. Or some flink platforms can integrate
this tool and use `pipeline.jobvertex-parallelism-overrides`[1]
to make autoscaler work on Flink on yarn.

> By adding it to the operator, the autoscaler can potentially work on
> older Flink versions as well

As I understand, `pipeline.jobvertex-parallelism-overrides`[1]
is supported in Flink 1.17, so old flink versions can only detect,
not auto scala, right?

[1] https://issues.apache.org/jira/browse/FLINK-29501

Best
Rui Fan


On Fri, Nov 25, 2022 at 4:54 AM Gyula Fóra  wrote:

> Hi John!
>
> Thank you for the excellent question.
>
> There are few reasons why we felt that the operator is the right place for
> this component:
>
>  - Ideally the autoscaler is a separate process (an outside observer) , and
> the jobmanager is very much tied to the lifecycle of the job. The operator
> is a perfect example of such an external process that lives beyond
> individual jobs.
>  - Scaling itself might need some external resource management (for
> standalone clusters) that the jobmanager is not capable of, and the logic
> is already in the operator
> - Adding this to the operator allows us to integrate this fully in the
> lifecycle management of the application. This guarantees that scaling
> decisions do not interfere with upgrades, suspends etc.
> - By adding it to the operator, the autoscaler can potentially work on
> older Flink versions as well
> - The jobmanager is a component designed to handle Flink individual jobs,
> but the autoscaler component needs to work on a higher abstraction layer to
> be able to integrate with user job upgrades etc.
>
> These are some of the main things that come to my mind :)
>
> Having it in the operator ties this logic to Kubernetes itself but we feel
> that an autoscaler is mostly relevant in an elastic cloud environment
> anyways.
>
> Cheers,
> Gyula
>
> On Thu, Nov 24, 2022 at 9:40 PM John Roesler  wrote:
>
> > Hi Max,
> >
> > Thanks for the FLIP!
> >
> > I’ve been curious about one one point. I can imagine some good reasons
> for
> > it but wonder what you have in mind. What’s the reason to add auto
> scaling
> > to the Operator instead of to the JobManager?
> >
> > It seems like adding that capability to the JobManager would be a bigger
> > project, but it also would create some interesting opportunities.
> >
> > This is certainly not a suggestion, just a question.
> >
> > Thanks!
> > John
> >
> > On Wed, Nov 23, 2022, at 10:12, Maximilian Michels wrote:
> > > Thanks for your comments @Dong and @Chen. It is true that not all the
> > > details are contained in the FLIP. The document is meant as a general
> > > design concept.
> > >
> > > As for the rescaling time, this is going to be a configurable setting
> for
> > > now but it is foreseeable that we will provide auto-tuning of this
> > > configuration value by observing the job restart time. Same goes for
> the
> > > scaling decision itself which can learn from previous decisions. But we
> > > want to keep it simple for the first version.
> > >
> > > For sources that do not support the pendingRecords metric, we are
> > planning
> > > to either give the user the choice to set a manual target rate, or
> scale
> > it
> > > purely based on its utilization as reported via busyTimeMsPerSecond. In
> > > case of legacy sources, we will skip scaling these branches entirely
> > > because they support neither of these metrics.
> > >
> > > -Max
> > >
> > > On Mon, Nov 21, 2022 at 11:27 AM Maximilian Michels 
> > wrote:
> > >
> > >> >Do we think the scaler could be a plugin or hard coded ?
> > >>
> > >> +1 For pluggable scaling logic.
> > >>
> > >> On Mon, Nov 21, 2022 at 3:38 AM Chen Qin  wrote:
> > >>
> > >>> On Sun, Nov 20, 2022 at 7:25 AM Gyula Fóra 
> > wrote:
> > >>>
> > >>> > Hi Chen!
> > >>> >
> > >>> > I think in the long term it makes sense to provide some pluggable
> > >>> > mechanisms but it's not completely trivial where exactly you would
> > plug
> > >>> in
> > >>> > your custom logic at this point.
> > >>> >
> > >>> sounds good, more specifically would be great if it can accept input
> > >>> features
> > >>> (including previous scaling decisions) and output decisions.
> > >>> Folks might keep their own secret sauce and avoid patching oss 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-24 Thread Gyula Fóra
Hi John!

Thank you for the excellent question.

There are few reasons why we felt that the operator is the right place for
this component:

 - Ideally the autoscaler is a separate process (an outside observer) , and
the jobmanager is very much tied to the lifecycle of the job. The operator
is a perfect example of such an external process that lives beyond
individual jobs.
 - Scaling itself might need some external resource management (for
standalone clusters) that the jobmanager is not capable of, and the logic
is already in the operator
- Adding this to the operator allows us to integrate this fully in the
lifecycle management of the application. This guarantees that scaling
decisions do not interfere with upgrades, suspends etc.
- By adding it to the operator, the autoscaler can potentially work on
older Flink versions as well
- The jobmanager is a component designed to handle Flink individual jobs,
but the autoscaler component needs to work on a higher abstraction layer to
be able to integrate with user job upgrades etc.

These are some of the main things that come to my mind :)

Having it in the operator ties this logic to Kubernetes itself but we feel
that an autoscaler is mostly relevant in an elastic cloud environment
anyways.

Cheers,
Gyula

On Thu, Nov 24, 2022 at 9:40 PM John Roesler  wrote:

> Hi Max,
>
> Thanks for the FLIP!
>
> I’ve been curious about one one point. I can imagine some good reasons for
> it but wonder what you have in mind. What’s the reason to add auto scaling
> to the Operator instead of to the JobManager?
>
> It seems like adding that capability to the JobManager would be a bigger
> project, but it also would create some interesting opportunities.
>
> This is certainly not a suggestion, just a question.
>
> Thanks!
> John
>
> On Wed, Nov 23, 2022, at 10:12, Maximilian Michels wrote:
> > Thanks for your comments @Dong and @Chen. It is true that not all the
> > details are contained in the FLIP. The document is meant as a general
> > design concept.
> >
> > As for the rescaling time, this is going to be a configurable setting for
> > now but it is foreseeable that we will provide auto-tuning of this
> > configuration value by observing the job restart time. Same goes for the
> > scaling decision itself which can learn from previous decisions. But we
> > want to keep it simple for the first version.
> >
> > For sources that do not support the pendingRecords metric, we are
> planning
> > to either give the user the choice to set a manual target rate, or scale
> it
> > purely based on its utilization as reported via busyTimeMsPerSecond. In
> > case of legacy sources, we will skip scaling these branches entirely
> > because they support neither of these metrics.
> >
> > -Max
> >
> > On Mon, Nov 21, 2022 at 11:27 AM Maximilian Michels 
> wrote:
> >
> >> >Do we think the scaler could be a plugin or hard coded ?
> >>
> >> +1 For pluggable scaling logic.
> >>
> >> On Mon, Nov 21, 2022 at 3:38 AM Chen Qin  wrote:
> >>
> >>> On Sun, Nov 20, 2022 at 7:25 AM Gyula Fóra 
> wrote:
> >>>
> >>> > Hi Chen!
> >>> >
> >>> > I think in the long term it makes sense to provide some pluggable
> >>> > mechanisms but it's not completely trivial where exactly you would
> plug
> >>> in
> >>> > your custom logic at this point.
> >>> >
> >>> sounds good, more specifically would be great if it can accept input
> >>> features
> >>> (including previous scaling decisions) and output decisions.
> >>> Folks might keep their own secret sauce and avoid patching oss fork.
> >>>
> >>> >
> >>> > In any case the problems you mentioned should be solved robustly by
> the
> >>> > algorithm itself without any customization:
> >>> >  - We need to be able to detect ineffective scaling decisions, let\s
> >>> say we
> >>> > scaled up (expecting better throughput with a higher parallelism)
> but we
> >>> > did not get a better processing capacity (this would be the external
> >>> > service bottleneck)
> >>> >
> >>> sounds good, so we would at least try restart job once (optimistic
> path)
> >>> as
> >>> design choice.
> >>>
> >>> >  - We are evaluating metrics in windows, and we have some flexible
> >>> > boundaries to avoid scaling on minor load spikes
> >>> >
> >>> yes, would be great if user can feed in throughput changes over
> different
> >>> time buckets (last 10s, 30s, 1 min,5 mins) as input features
> >>>
> >>> >
> >>> > Regards,
> >>> > Gyula
> >>> >
> >>> > On Sun, Nov 20, 2022 at 12:28 AM Chen Qin 
> wrote:
> >>> >
> >>> > > Hi Gyula,
> >>> > >
> >>> > > Do we think the scaler could be a plugin or hard coded ?
> >>> > > We observed some cases scaler can't address (e.g async io
> dependency
> >>> > > service degradation or small spike that doesn't worth restarting
> job)
> >>> > >
> >>> > > Thanks,
> >>> > > Chen
> >>> > >
> >>> > > On Fri, Nov 18, 2022 at 1:03 AM Gyula Fóra 
> >>> wrote:
> >>> > >
> >>> > > > Hi Dong!
> >>> > > >
> >>> > > > Could you please confirm that your main concerns have been
> >>> 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-24 Thread John Roesler
Hi Max,

Thanks for the FLIP!

I’ve been curious about one one point. I can imagine some good reasons for it 
but wonder what you have in mind. What’s the reason to add auto scaling to the 
Operator instead of to the JobManager?

It seems like adding that capability to the JobManager would be a bigger 
project, but it also would create some interesting opportunities.

This is certainly not a suggestion, just a question. 

Thanks!
John 

On Wed, Nov 23, 2022, at 10:12, Maximilian Michels wrote:
> Thanks for your comments @Dong and @Chen. It is true that not all the
> details are contained in the FLIP. The document is meant as a general
> design concept.
>
> As for the rescaling time, this is going to be a configurable setting for
> now but it is foreseeable that we will provide auto-tuning of this
> configuration value by observing the job restart time. Same goes for the
> scaling decision itself which can learn from previous decisions. But we
> want to keep it simple for the first version.
>
> For sources that do not support the pendingRecords metric, we are planning
> to either give the user the choice to set a manual target rate, or scale it
> purely based on its utilization as reported via busyTimeMsPerSecond. In
> case of legacy sources, we will skip scaling these branches entirely
> because they support neither of these metrics.
>
> -Max
>
> On Mon, Nov 21, 2022 at 11:27 AM Maximilian Michels  wrote:
>
>> >Do we think the scaler could be a plugin or hard coded ?
>>
>> +1 For pluggable scaling logic.
>>
>> On Mon, Nov 21, 2022 at 3:38 AM Chen Qin  wrote:
>>
>>> On Sun, Nov 20, 2022 at 7:25 AM Gyula Fóra  wrote:
>>>
>>> > Hi Chen!
>>> >
>>> > I think in the long term it makes sense to provide some pluggable
>>> > mechanisms but it's not completely trivial where exactly you would plug
>>> in
>>> > your custom logic at this point.
>>> >
>>> sounds good, more specifically would be great if it can accept input
>>> features
>>> (including previous scaling decisions) and output decisions.
>>> Folks might keep their own secret sauce and avoid patching oss fork.
>>>
>>> >
>>> > In any case the problems you mentioned should be solved robustly by the
>>> > algorithm itself without any customization:
>>> >  - We need to be able to detect ineffective scaling decisions, let\s
>>> say we
>>> > scaled up (expecting better throughput with a higher parallelism) but we
>>> > did not get a better processing capacity (this would be the external
>>> > service bottleneck)
>>> >
>>> sounds good, so we would at least try restart job once (optimistic path)
>>> as
>>> design choice.
>>>
>>> >  - We are evaluating metrics in windows, and we have some flexible
>>> > boundaries to avoid scaling on minor load spikes
>>> >
>>> yes, would be great if user can feed in throughput changes over different
>>> time buckets (last 10s, 30s, 1 min,5 mins) as input features
>>>
>>> >
>>> > Regards,
>>> > Gyula
>>> >
>>> > On Sun, Nov 20, 2022 at 12:28 AM Chen Qin  wrote:
>>> >
>>> > > Hi Gyula,
>>> > >
>>> > > Do we think the scaler could be a plugin or hard coded ?
>>> > > We observed some cases scaler can't address (e.g async io dependency
>>> > > service degradation or small spike that doesn't worth restarting job)
>>> > >
>>> > > Thanks,
>>> > > Chen
>>> > >
>>> > > On Fri, Nov 18, 2022 at 1:03 AM Gyula Fóra 
>>> wrote:
>>> > >
>>> > > > Hi Dong!
>>> > > >
>>> > > > Could you please confirm that your main concerns have been
>>> addressed?
>>> > > >
>>> > > > Some other minor details that might not have been fully clarified:
>>> > > >  - The prototype has been validated on some production workloads yes
>>> > > >  - We are only planning to use metrics that are generally available
>>> and
>>> > > are
>>> > > > previously accepted to be standardized connector metrics (not Kafka
>>> > > > specific). This is actually specified in the FLIP
>>> > > >  - Even if some metrics (such as pendingRecords) are not accessible
>>> the
>>> > > > scaling algorithm works and can be used. For source scaling based on
>>> > > > utilization alone we still need some trivial modifications on the
>>> > > > implementation side.
>>> > > >
>>> > > > Cheers,
>>> > > > Gyula
>>> > > >
>>> > > > On Thu, Nov 17, 2022 at 5:22 PM Gyula Fóra 
>>> > wrote:
>>> > > >
>>> > > > > Hi Dong!
>>> > > > >
>>> > > > > This is not an experimental feature proposal. The implementation
>>> of
>>> > the
>>> > > > > prototype is still in an experimental phase but by the time the
>>> FLIP,
>>> > > > > initial prototype and review is done, this should be in a good
>>> stable
>>> > > > first
>>> > > > > version.
>>> > > > > This proposal is pretty general as autoscalers/tuners get as far
>>> as I
>>> > > > > understand and there is no history of any alternative effort that
>>> > even
>>> > > > > comes close to the applicability of this solution.
>>> > > > >
>>> > > > > Any large features that were added to Flink in the past have gone
>>> > > through
>>> > > > > several iterations over the years 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-23 Thread Maximilian Michels
Thanks for your comments @Dong and @Chen. It is true that not all the
details are contained in the FLIP. The document is meant as a general
design concept.

As for the rescaling time, this is going to be a configurable setting for
now but it is foreseeable that we will provide auto-tuning of this
configuration value by observing the job restart time. Same goes for the
scaling decision itself which can learn from previous decisions. But we
want to keep it simple for the first version.

For sources that do not support the pendingRecords metric, we are planning
to either give the user the choice to set a manual target rate, or scale it
purely based on its utilization as reported via busyTimeMsPerSecond. In
case of legacy sources, we will skip scaling these branches entirely
because they support neither of these metrics.

-Max

On Mon, Nov 21, 2022 at 11:27 AM Maximilian Michels  wrote:

> >Do we think the scaler could be a plugin or hard coded ?
>
> +1 For pluggable scaling logic.
>
> On Mon, Nov 21, 2022 at 3:38 AM Chen Qin  wrote:
>
>> On Sun, Nov 20, 2022 at 7:25 AM Gyula Fóra  wrote:
>>
>> > Hi Chen!
>> >
>> > I think in the long term it makes sense to provide some pluggable
>> > mechanisms but it's not completely trivial where exactly you would plug
>> in
>> > your custom logic at this point.
>> >
>> sounds good, more specifically would be great if it can accept input
>> features
>> (including previous scaling decisions) and output decisions.
>> Folks might keep their own secret sauce and avoid patching oss fork.
>>
>> >
>> > In any case the problems you mentioned should be solved robustly by the
>> > algorithm itself without any customization:
>> >  - We need to be able to detect ineffective scaling decisions, let\s
>> say we
>> > scaled up (expecting better throughput with a higher parallelism) but we
>> > did not get a better processing capacity (this would be the external
>> > service bottleneck)
>> >
>> sounds good, so we would at least try restart job once (optimistic path)
>> as
>> design choice.
>>
>> >  - We are evaluating metrics in windows, and we have some flexible
>> > boundaries to avoid scaling on minor load spikes
>> >
>> yes, would be great if user can feed in throughput changes over different
>> time buckets (last 10s, 30s, 1 min,5 mins) as input features
>>
>> >
>> > Regards,
>> > Gyula
>> >
>> > On Sun, Nov 20, 2022 at 12:28 AM Chen Qin  wrote:
>> >
>> > > Hi Gyula,
>> > >
>> > > Do we think the scaler could be a plugin or hard coded ?
>> > > We observed some cases scaler can't address (e.g async io dependency
>> > > service degradation or small spike that doesn't worth restarting job)
>> > >
>> > > Thanks,
>> > > Chen
>> > >
>> > > On Fri, Nov 18, 2022 at 1:03 AM Gyula Fóra 
>> wrote:
>> > >
>> > > > Hi Dong!
>> > > >
>> > > > Could you please confirm that your main concerns have been
>> addressed?
>> > > >
>> > > > Some other minor details that might not have been fully clarified:
>> > > >  - The prototype has been validated on some production workloads yes
>> > > >  - We are only planning to use metrics that are generally available
>> and
>> > > are
>> > > > previously accepted to be standardized connector metrics (not Kafka
>> > > > specific). This is actually specified in the FLIP
>> > > >  - Even if some metrics (such as pendingRecords) are not accessible
>> the
>> > > > scaling algorithm works and can be used. For source scaling based on
>> > > > utilization alone we still need some trivial modifications on the
>> > > > implementation side.
>> > > >
>> > > > Cheers,
>> > > > Gyula
>> > > >
>> > > > On Thu, Nov 17, 2022 at 5:22 PM Gyula Fóra 
>> > wrote:
>> > > >
>> > > > > Hi Dong!
>> > > > >
>> > > > > This is not an experimental feature proposal. The implementation
>> of
>> > the
>> > > > > prototype is still in an experimental phase but by the time the
>> FLIP,
>> > > > > initial prototype and review is done, this should be in a good
>> stable
>> > > > first
>> > > > > version.
>> > > > > This proposal is pretty general as autoscalers/tuners get as far
>> as I
>> > > > > understand and there is no history of any alternative effort that
>> > even
>> > > > > comes close to the applicability of this solution.
>> > > > >
>> > > > > Any large features that were added to Flink in the past have gone
>> > > through
>> > > > > several iterations over the years and the APIs have evolved as
>> they
>> > > > matured.
>> > > > > Something like the autoscaler can only be successful if there is
>> > enough
>> > > > > user exposure and feedback to make it good, putting it in an
>> external
>> > > > repo
>> > > > > will not get us anywhere.
>> > > > >
>> > > > > We have a prototype implementation ready that works well and it is
>> > more
>> > > > or
>> > > > > less feature complete. We proposed this FLIP based on something
>> that
>> > we
>> > > > see
>> > > > > as a working solution, please do not underestimate the effort that
>> > went
>> > > > > into this proposal and the validation of 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-21 Thread Maximilian Michels
>Do we think the scaler could be a plugin or hard coded ?

+1 For pluggable scaling logic.

On Mon, Nov 21, 2022 at 3:38 AM Chen Qin  wrote:

> On Sun, Nov 20, 2022 at 7:25 AM Gyula Fóra  wrote:
>
> > Hi Chen!
> >
> > I think in the long term it makes sense to provide some pluggable
> > mechanisms but it's not completely trivial where exactly you would plug
> in
> > your custom logic at this point.
> >
> sounds good, more specifically would be great if it can accept input
> features
> (including previous scaling decisions) and output decisions.
> Folks might keep their own secret sauce and avoid patching oss fork.
>
> >
> > In any case the problems you mentioned should be solved robustly by the
> > algorithm itself without any customization:
> >  - We need to be able to detect ineffective scaling decisions, let\s say
> we
> > scaled up (expecting better throughput with a higher parallelism) but we
> > did not get a better processing capacity (this would be the external
> > service bottleneck)
> >
> sounds good, so we would at least try restart job once (optimistic path) as
> design choice.
>
> >  - We are evaluating metrics in windows, and we have some flexible
> > boundaries to avoid scaling on minor load spikes
> >
> yes, would be great if user can feed in throughput changes over different
> time buckets (last 10s, 30s, 1 min,5 mins) as input features
>
> >
> > Regards,
> > Gyula
> >
> > On Sun, Nov 20, 2022 at 12:28 AM Chen Qin  wrote:
> >
> > > Hi Gyula,
> > >
> > > Do we think the scaler could be a plugin or hard coded ?
> > > We observed some cases scaler can't address (e.g async io dependency
> > > service degradation or small spike that doesn't worth restarting job)
> > >
> > > Thanks,
> > > Chen
> > >
> > > On Fri, Nov 18, 2022 at 1:03 AM Gyula Fóra 
> wrote:
> > >
> > > > Hi Dong!
> > > >
> > > > Could you please confirm that your main concerns have been addressed?
> > > >
> > > > Some other minor details that might not have been fully clarified:
> > > >  - The prototype has been validated on some production workloads yes
> > > >  - We are only planning to use metrics that are generally available
> and
> > > are
> > > > previously accepted to be standardized connector metrics (not Kafka
> > > > specific). This is actually specified in the FLIP
> > > >  - Even if some metrics (such as pendingRecords) are not accessible
> the
> > > > scaling algorithm works and can be used. For source scaling based on
> > > > utilization alone we still need some trivial modifications on the
> > > > implementation side.
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > > > On Thu, Nov 17, 2022 at 5:22 PM Gyula Fóra 
> > wrote:
> > > >
> > > > > Hi Dong!
> > > > >
> > > > > This is not an experimental feature proposal. The implementation of
> > the
> > > > > prototype is still in an experimental phase but by the time the
> FLIP,
> > > > > initial prototype and review is done, this should be in a good
> stable
> > > > first
> > > > > version.
> > > > > This proposal is pretty general as autoscalers/tuners get as far
> as I
> > > > > understand and there is no history of any alternative effort that
> > even
> > > > > comes close to the applicability of this solution.
> > > > >
> > > > > Any large features that were added to Flink in the past have gone
> > > through
> > > > > several iterations over the years and the APIs have evolved as they
> > > > matured.
> > > > > Something like the autoscaler can only be successful if there is
> > enough
> > > > > user exposure and feedback to make it good, putting it in an
> external
> > > > repo
> > > > > will not get us anywhere.
> > > > >
> > > > > We have a prototype implementation ready that works well and it is
> > more
> > > > or
> > > > > less feature complete. We proposed this FLIP based on something
> that
> > we
> > > > see
> > > > > as a working solution, please do not underestimate the effort that
> > went
> > > > > into this proposal and the validation of the ideas. So in this
> sense
> > > our
> > > > > approach here is the same as with the Table Store and Kubernetes
> > > Operator
> > > > > and other big components of the past. On the other hand it's
> > impossible
> > > > to
> > > > > sufficiently explain all the technical depth/implementation details
> > of
> > > > such
> > > > > complex components in FLIPs to 100%, I feel we have a good overview
> > of
> > > > the
> > > > > algorithm in the FLIP and the implementation should cover all
> > remaining
> > > > > questions. We will have an extended code review phase following the
> > > FLIP
> > > > > vote before this make it into the project.
> > > > >
> > > > > I understand your concern regarding the stability of Flink
> Kubernetes
> > > > > Operator config and metric names. We have decided to not provide
> > > > guarantees
> > > > > there yet but if you feel that it's time for the operator to
> support
> > > such
> > > > > guarantees please open a separate discussion on that topic, I don't
> > > want
> > > > to
> 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-20 Thread Chen Qin
On Sun, Nov 20, 2022 at 7:25 AM Gyula Fóra  wrote:

> Hi Chen!
>
> I think in the long term it makes sense to provide some pluggable
> mechanisms but it's not completely trivial where exactly you would plug in
> your custom logic at this point.
>
sounds good, more specifically would be great if it can accept input features
(including previous scaling decisions) and output decisions.
Folks might keep their own secret sauce and avoid patching oss fork.

>
> In any case the problems you mentioned should be solved robustly by the
> algorithm itself without any customization:
>  - We need to be able to detect ineffective scaling decisions, let\s say we
> scaled up (expecting better throughput with a higher parallelism) but we
> did not get a better processing capacity (this would be the external
> service bottleneck)
>
sounds good, so we would at least try restart job once (optimistic path) as
design choice.

>  - We are evaluating metrics in windows, and we have some flexible
> boundaries to avoid scaling on minor load spikes
>
yes, would be great if user can feed in throughput changes over different
time buckets (last 10s, 30s, 1 min,5 mins) as input features

>
> Regards,
> Gyula
>
> On Sun, Nov 20, 2022 at 12:28 AM Chen Qin  wrote:
>
> > Hi Gyula,
> >
> > Do we think the scaler could be a plugin or hard coded ?
> > We observed some cases scaler can't address (e.g async io dependency
> > service degradation or small spike that doesn't worth restarting job)
> >
> > Thanks,
> > Chen
> >
> > On Fri, Nov 18, 2022 at 1:03 AM Gyula Fóra  wrote:
> >
> > > Hi Dong!
> > >
> > > Could you please confirm that your main concerns have been addressed?
> > >
> > > Some other minor details that might not have been fully clarified:
> > >  - The prototype has been validated on some production workloads yes
> > >  - We are only planning to use metrics that are generally available and
> > are
> > > previously accepted to be standardized connector metrics (not Kafka
> > > specific). This is actually specified in the FLIP
> > >  - Even if some metrics (such as pendingRecords) are not accessible the
> > > scaling algorithm works and can be used. For source scaling based on
> > > utilization alone we still need some trivial modifications on the
> > > implementation side.
> > >
> > > Cheers,
> > > Gyula
> > >
> > > On Thu, Nov 17, 2022 at 5:22 PM Gyula Fóra 
> wrote:
> > >
> > > > Hi Dong!
> > > >
> > > > This is not an experimental feature proposal. The implementation of
> the
> > > > prototype is still in an experimental phase but by the time the FLIP,
> > > > initial prototype and review is done, this should be in a good stable
> > > first
> > > > version.
> > > > This proposal is pretty general as autoscalers/tuners get as far as I
> > > > understand and there is no history of any alternative effort that
> even
> > > > comes close to the applicability of this solution.
> > > >
> > > > Any large features that were added to Flink in the past have gone
> > through
> > > > several iterations over the years and the APIs have evolved as they
> > > matured.
> > > > Something like the autoscaler can only be successful if there is
> enough
> > > > user exposure and feedback to make it good, putting it in an external
> > > repo
> > > > will not get us anywhere.
> > > >
> > > > We have a prototype implementation ready that works well and it is
> more
> > > or
> > > > less feature complete. We proposed this FLIP based on something that
> we
> > > see
> > > > as a working solution, please do not underestimate the effort that
> went
> > > > into this proposal and the validation of the ideas. So in this sense
> > our
> > > > approach here is the same as with the Table Store and Kubernetes
> > Operator
> > > > and other big components of the past. On the other hand it's
> impossible
> > > to
> > > > sufficiently explain all the technical depth/implementation details
> of
> > > such
> > > > complex components in FLIPs to 100%, I feel we have a good overview
> of
> > > the
> > > > algorithm in the FLIP and the implementation should cover all
> remaining
> > > > questions. We will have an extended code review phase following the
> > FLIP
> > > > vote before this make it into the project.
> > > >
> > > > I understand your concern regarding the stability of Flink Kubernetes
> > > > Operator config and metric names. We have decided to not provide
> > > guarantees
> > > > there yet but if you feel that it's time for the operator to support
> > such
> > > > guarantees please open a separate discussion on that topic, I don't
> > want
> > > to
> > > > mix the two problems here.
> > > >
> > > > Regards,
> > > > Gyula
> > > >
> > > > On Thu, Nov 17, 2022 at 5:07 PM Dong Lin 
> wrote:
> > > >
> > > >> Hi Gyula,
> > > >>
> > > >> If I understand correctly, this autopilot proposal is an
> experimental
> > > >> feature and its configs/metrics are not mature enough to provide
> > > backward
> > > >> compatibility yet. And the proposal provides high-level ideas 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-20 Thread Gyula Fóra
Hi Chen!

I think in the long term it makes sense to provide some pluggable
mechanisms but it's not completely trivial where exactly you would plug in
your custom logic at this point.

In any case the problems you mentioned should be solved robustly by the
algorithm itself without any customization:
 - We need to be able to detect ineffective scaling decisions, let\s say we
scaled up (expecting better throughput with a higher parallelism) but we
did not get a better processing capacity (this would be the external
service bottleneck)
 - We are evaluating metrics in windows, and we have some flexible
boundaries to avoid scaling on minor load spikes

Regards,
Gyula

On Sun, Nov 20, 2022 at 12:28 AM Chen Qin  wrote:

> Hi Gyula,
>
> Do we think the scaler could be a plugin or hard coded ?
> We observed some cases scaler can't address (e.g async io dependency
> service degradation or small spike that doesn't worth restarting job)
>
> Thanks,
> Chen
>
> On Fri, Nov 18, 2022 at 1:03 AM Gyula Fóra  wrote:
>
> > Hi Dong!
> >
> > Could you please confirm that your main concerns have been addressed?
> >
> > Some other minor details that might not have been fully clarified:
> >  - The prototype has been validated on some production workloads yes
> >  - We are only planning to use metrics that are generally available and
> are
> > previously accepted to be standardized connector metrics (not Kafka
> > specific). This is actually specified in the FLIP
> >  - Even if some metrics (such as pendingRecords) are not accessible the
> > scaling algorithm works and can be used. For source scaling based on
> > utilization alone we still need some trivial modifications on the
> > implementation side.
> >
> > Cheers,
> > Gyula
> >
> > On Thu, Nov 17, 2022 at 5:22 PM Gyula Fóra  wrote:
> >
> > > Hi Dong!
> > >
> > > This is not an experimental feature proposal. The implementation of the
> > > prototype is still in an experimental phase but by the time the FLIP,
> > > initial prototype and review is done, this should be in a good stable
> > first
> > > version.
> > > This proposal is pretty general as autoscalers/tuners get as far as I
> > > understand and there is no history of any alternative effort that even
> > > comes close to the applicability of this solution.
> > >
> > > Any large features that were added to Flink in the past have gone
> through
> > > several iterations over the years and the APIs have evolved as they
> > matured.
> > > Something like the autoscaler can only be successful if there is enough
> > > user exposure and feedback to make it good, putting it in an external
> > repo
> > > will not get us anywhere.
> > >
> > > We have a prototype implementation ready that works well and it is more
> > or
> > > less feature complete. We proposed this FLIP based on something that we
> > see
> > > as a working solution, please do not underestimate the effort that went
> > > into this proposal and the validation of the ideas. So in this sense
> our
> > > approach here is the same as with the Table Store and Kubernetes
> Operator
> > > and other big components of the past. On the other hand it's impossible
> > to
> > > sufficiently explain all the technical depth/implementation details of
> > such
> > > complex components in FLIPs to 100%, I feel we have a good overview of
> > the
> > > algorithm in the FLIP and the implementation should cover all remaining
> > > questions. We will have an extended code review phase following the
> FLIP
> > > vote before this make it into the project.
> > >
> > > I understand your concern regarding the stability of Flink Kubernetes
> > > Operator config and metric names. We have decided to not provide
> > guarantees
> > > there yet but if you feel that it's time for the operator to support
> such
> > > guarantees please open a separate discussion on that topic, I don't
> want
> > to
> > > mix the two problems here.
> > >
> > > Regards,
> > > Gyula
> > >
> > > On Thu, Nov 17, 2022 at 5:07 PM Dong Lin  wrote:
> > >
> > >> Hi Gyula,
> > >>
> > >> If I understand correctly, this autopilot proposal is an experimental
> > >> feature and its configs/metrics are not mature enough to provide
> > backward
> > >> compatibility yet. And the proposal provides high-level ideas of the
> > >> algorithm but it is probably too complicated to explain it end-to-end.
> > >>
> > >> On the one hand, I do agree that having an auto-tuning prototype, even
> > if
> > >> not mature, is better than nothing for Flink users. On the other
> hand, I
> > >> am
> > >> concerned that this FLIP seems a bit too experimental, and starting
> with
> > >> an
> > >> immature design might make it harder for us to reach a
> production-ready
> > >> and
> > >> generally applicable auto-tuner in the future. And introducing too
> > >> backward
> > >> incompatible changes generally hurts users' trust in the Flink
> project.
> > >>
> > >> One alternative might be to develop and experiment with this feature
> in
> > a
> > >> non-Flink 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-19 Thread Chen Qin
Hi Gyula,

Do we think the scaler could be a plugin or hard coded ?
We observed some cases scaler can't address (e.g async io dependency
service degradation or small spike that doesn't worth restarting job)

Thanks,
Chen

On Fri, Nov 18, 2022 at 1:03 AM Gyula Fóra  wrote:

> Hi Dong!
>
> Could you please confirm that your main concerns have been addressed?
>
> Some other minor details that might not have been fully clarified:
>  - The prototype has been validated on some production workloads yes
>  - We are only planning to use metrics that are generally available and are
> previously accepted to be standardized connector metrics (not Kafka
> specific). This is actually specified in the FLIP
>  - Even if some metrics (such as pendingRecords) are not accessible the
> scaling algorithm works and can be used. For source scaling based on
> utilization alone we still need some trivial modifications on the
> implementation side.
>
> Cheers,
> Gyula
>
> On Thu, Nov 17, 2022 at 5:22 PM Gyula Fóra  wrote:
>
> > Hi Dong!
> >
> > This is not an experimental feature proposal. The implementation of the
> > prototype is still in an experimental phase but by the time the FLIP,
> > initial prototype and review is done, this should be in a good stable
> first
> > version.
> > This proposal is pretty general as autoscalers/tuners get as far as I
> > understand and there is no history of any alternative effort that even
> > comes close to the applicability of this solution.
> >
> > Any large features that were added to Flink in the past have gone through
> > several iterations over the years and the APIs have evolved as they
> matured.
> > Something like the autoscaler can only be successful if there is enough
> > user exposure and feedback to make it good, putting it in an external
> repo
> > will not get us anywhere.
> >
> > We have a prototype implementation ready that works well and it is more
> or
> > less feature complete. We proposed this FLIP based on something that we
> see
> > as a working solution, please do not underestimate the effort that went
> > into this proposal and the validation of the ideas. So in this sense our
> > approach here is the same as with the Table Store and Kubernetes Operator
> > and other big components of the past. On the other hand it's impossible
> to
> > sufficiently explain all the technical depth/implementation details of
> such
> > complex components in FLIPs to 100%, I feel we have a good overview of
> the
> > algorithm in the FLIP and the implementation should cover all remaining
> > questions. We will have an extended code review phase following the FLIP
> > vote before this make it into the project.
> >
> > I understand your concern regarding the stability of Flink Kubernetes
> > Operator config and metric names. We have decided to not provide
> guarantees
> > there yet but if you feel that it's time for the operator to support such
> > guarantees please open a separate discussion on that topic, I don't want
> to
> > mix the two problems here.
> >
> > Regards,
> > Gyula
> >
> > On Thu, Nov 17, 2022 at 5:07 PM Dong Lin  wrote:
> >
> >> Hi Gyula,
> >>
> >> If I understand correctly, this autopilot proposal is an experimental
> >> feature and its configs/metrics are not mature enough to provide
> backward
> >> compatibility yet. And the proposal provides high-level ideas of the
> >> algorithm but it is probably too complicated to explain it end-to-end.
> >>
> >> On the one hand, I do agree that having an auto-tuning prototype, even
> if
> >> not mature, is better than nothing for Flink users. On the other hand, I
> >> am
> >> concerned that this FLIP seems a bit too experimental, and starting with
> >> an
> >> immature design might make it harder for us to reach a production-ready
> >> and
> >> generally applicable auto-tuner in the future. And introducing too
> >> backward
> >> incompatible changes generally hurts users' trust in the Flink project.
> >>
> >> One alternative might be to develop and experiment with this feature in
> a
> >> non-Flink repo. You can iterate fast without worrying about typically
> >> backward compatibility requirement as required for most Flink public
> >> features. And once the feature is reasonably evaluated and mature
> enough,
> >> it will be much easier to explain the design and address all the issues
> >> mentioned above. For example, Jingsong implemented a Flink Table Store
> >> prototype
> >> 
> >> before
> >> proposing FLIP-188 in this thread
> >> .
> >>
> >> I don't intend to block your progress. Just my two cents. It will be
> great
> >> to hear more from other developers (e.g. in the voting thread).
> >>
> >> Thanks,
> >> Dong
> >>
> >>
> >> On Thu, Nov 17, 2022 at 1:24 AM Gyula Fóra 
> wrote:
> >>
> >> > Hi Dong,
> >> >
> >> > Let me address your comments.
> >> >
> >> > Time for scale / backlog processing 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-18 Thread Dong Lin
Hi Gyula!

Thanks for all the explanations!

Personally, I would like to see a full story of how the algorithm works
(e.g. how it determines the estimated time for scale), how users can get
the basic information needed to monitor the health/effectiveness of
autoscaler (e.g. metrics), and how the algorithm is expected to work with
sources that does not have the optional pendingRecords metric (e.g. it does
not seem clear how users can set the desired parallelism or target rate as
mentioned in the FLIP).

With that being said, I am very happy to see you guys taking efforts to
tackle this hard problem and I don't want to block the progress. I
understand it's impossible to sufficiently explain all the technical
depth/implementation details of such complex components in FLIPs to 100%. I
would be happy to see the FLIP go through if other developers/committers
give +1 for this FLIP.

Thanks,
Dong


On Fri, Nov 18, 2022 at 5:03 PM Gyula Fóra  wrote:

> Hi Dong!
>
> Could you please confirm that your main concerns have been addressed?
>
> Some other minor details that might not have been fully clarified:
>  - The prototype has been validated on some production workloads yes
>  - We are only planning to use metrics that are generally available and
> are previously accepted to be standardized connector metrics (not Kafka
> specific). This is actually specified in the FLIP
>  - Even if some metrics (such as pendingRecords) are not accessible the
> scaling algorithm works and can be used. For source scaling based on
> utilization alone we still need some trivial modifications on the
> implementation side.
>
> Cheers,
> Gyula
>
> On Thu, Nov 17, 2022 at 5:22 PM Gyula Fóra  wrote:
>
>> Hi Dong!
>>
>> This is not an experimental feature proposal. The implementation of the
>> prototype is still in an experimental phase but by the time the FLIP,
>> initial prototype and review is done, this should be in a good stable first
>> version.
>> This proposal is pretty general as autoscalers/tuners get as far as I
>> understand and there is no history of any alternative effort that even
>> comes close to the applicability of this solution.
>>
>> Any large features that were added to Flink in the past have gone through
>> several iterations over the years and the APIs have evolved as they matured.
>> Something like the autoscaler can only be successful if there is enough
>> user exposure and feedback to make it good, putting it in an external repo
>> will not get us anywhere.
>>
>> We have a prototype implementation ready that works well and it is more
>> or less feature complete. We proposed this FLIP based on something that we
>> see as a working solution, please do not underestimate the effort that went
>> into this proposal and the validation of the ideas. So in this sense our
>> approach here is the same as with the Table Store and Kubernetes Operator
>> and other big components of the past. On the other hand it's impossible to
>> sufficiently explain all the technical depth/implementation details of such
>> complex components in FLIPs to 100%, I feel we have a good overview of the
>> algorithm in the FLIP and the implementation should cover all remaining
>> questions. We will have an extended code review phase following the FLIP
>> vote before this make it into the project.
>>
>> I understand your concern regarding the stability of Flink Kubernetes
>> Operator config and metric names. We have decided to not provide guarantees
>> there yet but if you feel that it's time for the operator to support such
>> guarantees please open a separate discussion on that topic, I don't want to
>> mix the two problems here.
>>
>> Regards,
>> Gyula
>>
>> On Thu, Nov 17, 2022 at 5:07 PM Dong Lin  wrote:
>>
>>> Hi Gyula,
>>>
>>> If I understand correctly, this autopilot proposal is an experimental
>>> feature and its configs/metrics are not mature enough to provide backward
>>> compatibility yet. And the proposal provides high-level ideas of the
>>> algorithm but it is probably too complicated to explain it end-to-end.
>>>
>>> On the one hand, I do agree that having an auto-tuning prototype, even if
>>> not mature, is better than nothing for Flink users. On the other hand, I
>>> am
>>> concerned that this FLIP seems a bit too experimental, and starting with
>>> an
>>> immature design might make it harder for us to reach a production-ready
>>> and
>>> generally applicable auto-tuner in the future. And introducing too
>>> backward
>>> incompatible changes generally hurts users' trust in the Flink project.
>>>
>>> One alternative might be to develop and experiment with this feature in a
>>> non-Flink repo. You can iterate fast without worrying about typically
>>> backward compatibility requirement as required for most Flink public
>>> features. And once the feature is reasonably evaluated and mature enough,
>>> it will be much easier to explain the design and address all the issues
>>> mentioned above. For example, Jingsong implemented a Flink 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-18 Thread Gyula Fóra
Hi Dong!

Could you please confirm that your main concerns have been addressed?

Some other minor details that might not have been fully clarified:
 - The prototype has been validated on some production workloads yes
 - We are only planning to use metrics that are generally available and are
previously accepted to be standardized connector metrics (not Kafka
specific). This is actually specified in the FLIP
 - Even if some metrics (such as pendingRecords) are not accessible the
scaling algorithm works and can be used. For source scaling based on
utilization alone we still need some trivial modifications on the
implementation side.

Cheers,
Gyula

On Thu, Nov 17, 2022 at 5:22 PM Gyula Fóra  wrote:

> Hi Dong!
>
> This is not an experimental feature proposal. The implementation of the
> prototype is still in an experimental phase but by the time the FLIP,
> initial prototype and review is done, this should be in a good stable first
> version.
> This proposal is pretty general as autoscalers/tuners get as far as I
> understand and there is no history of any alternative effort that even
> comes close to the applicability of this solution.
>
> Any large features that were added to Flink in the past have gone through
> several iterations over the years and the APIs have evolved as they matured.
> Something like the autoscaler can only be successful if there is enough
> user exposure and feedback to make it good, putting it in an external repo
> will not get us anywhere.
>
> We have a prototype implementation ready that works well and it is more or
> less feature complete. We proposed this FLIP based on something that we see
> as a working solution, please do not underestimate the effort that went
> into this proposal and the validation of the ideas. So in this sense our
> approach here is the same as with the Table Store and Kubernetes Operator
> and other big components of the past. On the other hand it's impossible to
> sufficiently explain all the technical depth/implementation details of such
> complex components in FLIPs to 100%, I feel we have a good overview of the
> algorithm in the FLIP and the implementation should cover all remaining
> questions. We will have an extended code review phase following the FLIP
> vote before this make it into the project.
>
> I understand your concern regarding the stability of Flink Kubernetes
> Operator config and metric names. We have decided to not provide guarantees
> there yet but if you feel that it's time for the operator to support such
> guarantees please open a separate discussion on that topic, I don't want to
> mix the two problems here.
>
> Regards,
> Gyula
>
> On Thu, Nov 17, 2022 at 5:07 PM Dong Lin  wrote:
>
>> Hi Gyula,
>>
>> If I understand correctly, this autopilot proposal is an experimental
>> feature and its configs/metrics are not mature enough to provide backward
>> compatibility yet. And the proposal provides high-level ideas of the
>> algorithm but it is probably too complicated to explain it end-to-end.
>>
>> On the one hand, I do agree that having an auto-tuning prototype, even if
>> not mature, is better than nothing for Flink users. On the other hand, I
>> am
>> concerned that this FLIP seems a bit too experimental, and starting with
>> an
>> immature design might make it harder for us to reach a production-ready
>> and
>> generally applicable auto-tuner in the future. And introducing too
>> backward
>> incompatible changes generally hurts users' trust in the Flink project.
>>
>> One alternative might be to develop and experiment with this feature in a
>> non-Flink repo. You can iterate fast without worrying about typically
>> backward compatibility requirement as required for most Flink public
>> features. And once the feature is reasonably evaluated and mature enough,
>> it will be much easier to explain the design and address all the issues
>> mentioned above. For example, Jingsong implemented a Flink Table Store
>> prototype
>> 
>> before
>> proposing FLIP-188 in this thread
>> .
>>
>> I don't intend to block your progress. Just my two cents. It will be great
>> to hear more from other developers (e.g. in the voting thread).
>>
>> Thanks,
>> Dong
>>
>>
>> On Thu, Nov 17, 2022 at 1:24 AM Gyula Fóra  wrote:
>>
>> > Hi Dong,
>> >
>> > Let me address your comments.
>> >
>> > Time for scale / backlog processing time derivation:
>> > We can add some more details to the Flip but at this point the
>> > implementation is actually much simpler than the algorithm to describe
>> it.
>> > I would not like to add more equations etc because it just
>> overcomplicates
>> > something relatively simple in practice.
>> >
>> > In a nutshell: Time to recover  == lag / processing-rate-after-scaleup.
>> > It's fairly easy to see where this is going, but best to see in code.
>> >
>> > Using pendingRecords and alternative 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-17 Thread Gyula Fóra
Hi Dong!

This is not an experimental feature proposal. The implementation of the
prototype is still in an experimental phase but by the time the FLIP,
initial prototype and review is done, this should be in a good stable first
version.
This proposal is pretty general as autoscalers/tuners get as far as I
understand and there is no history of any alternative effort that even
comes close to the applicability of this solution.

Any large features that were added to Flink in the past have gone through
several iterations over the years and the APIs have evolved as they matured.
Something like the autoscaler can only be successful if there is enough
user exposure and feedback to make it good, putting it in an external repo
will not get us anywhere.

We have a prototype implementation ready that works well and it is more or
less feature complete. We proposed this FLIP based on something that we see
as a working solution, please do not underestimate the effort that went
into this proposal and the validation of the ideas. So in this sense our
approach here is the same as with the Table Store and Kubernetes Operator
and other big components of the past. On the other hand it's impossible to
sufficiently explain all the technical depth/implementation details of such
complex components in FLIPs to 100%, I feel we have a good overview of the
algorithm in the FLIP and the implementation should cover all remaining
questions. We will have an extended code review phase following the FLIP
vote before this make it into the project.

I understand your concern regarding the stability of Flink Kubernetes
Operator config and metric names. We have decided to not provide guarantees
there yet but if you feel that it's time for the operator to support such
guarantees please open a separate discussion on that topic, I don't want to
mix the two problems here.

Regards,
Gyula

On Thu, Nov 17, 2022 at 5:07 PM Dong Lin  wrote:

> Hi Gyula,
>
> If I understand correctly, this autopilot proposal is an experimental
> feature and its configs/metrics are not mature enough to provide backward
> compatibility yet. And the proposal provides high-level ideas of the
> algorithm but it is probably too complicated to explain it end-to-end.
>
> On the one hand, I do agree that having an auto-tuning prototype, even if
> not mature, is better than nothing for Flink users. On the other hand, I am
> concerned that this FLIP seems a bit too experimental, and starting with an
> immature design might make it harder for us to reach a production-ready and
> generally applicable auto-tuner in the future. And introducing too backward
> incompatible changes generally hurts users' trust in the Flink project.
>
> One alternative might be to develop and experiment with this feature in a
> non-Flink repo. You can iterate fast without worrying about typically
> backward compatibility requirement as required for most Flink public
> features. And once the feature is reasonably evaluated and mature enough,
> it will be much easier to explain the design and address all the issues
> mentioned above. For example, Jingsong implemented a Flink Table Store
> prototype
> 
> before
> proposing FLIP-188 in this thread
> .
>
> I don't intend to block your progress. Just my two cents. It will be great
> to hear more from other developers (e.g. in the voting thread).
>
> Thanks,
> Dong
>
>
> On Thu, Nov 17, 2022 at 1:24 AM Gyula Fóra  wrote:
>
> > Hi Dong,
> >
> > Let me address your comments.
> >
> > Time for scale / backlog processing time derivation:
> > We can add some more details to the Flip but at this point the
> > implementation is actually much simpler than the algorithm to describe
> it.
> > I would not like to add more equations etc because it just
> overcomplicates
> > something relatively simple in practice.
> >
> > In a nutshell: Time to recover  == lag / processing-rate-after-scaleup.
> > It's fairly easy to see where this is going, but best to see in code.
> >
> > Using pendingRecords and alternative mechanisms:
> > True that the current algorithm relies on pending records to effectively
> > compute the target source processing rates and therefore scale sources.
> > This is available for Kafka which is by far the most common streaming
> > source and is used by the majority of streaming applications currently.
> > It would be very easy to add alternative purely utilization based scaling
> > to the sources. We can start with the current proposal and add this along
> > the way before the first version.
> >
> > Metrics, Configs and Public API:
> > The autoscaler feature is proposed for the Flink Kubernetes Operator
> which
> > does not have the same API/config maturity and thus does not provide the
> > same guarantees.
> > We currently support backward compatibilty for the CRD itself and not the
> > configs or metrics. This does not 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-17 Thread Dong Lin
Hi Gyula,

If I understand correctly, this autopilot proposal is an experimental
feature and its configs/metrics are not mature enough to provide backward
compatibility yet. And the proposal provides high-level ideas of the
algorithm but it is probably too complicated to explain it end-to-end.

On the one hand, I do agree that having an auto-tuning prototype, even if
not mature, is better than nothing for Flink users. On the other hand, I am
concerned that this FLIP seems a bit too experimental, and starting with an
immature design might make it harder for us to reach a production-ready and
generally applicable auto-tuner in the future. And introducing too backward
incompatible changes generally hurts users' trust in the Flink project.

One alternative might be to develop and experiment with this feature in a
non-Flink repo. You can iterate fast without worrying about typically
backward compatibility requirement as required for most Flink public
features. And once the feature is reasonably evaluated and mature enough,
it will be much easier to explain the design and address all the issues
mentioned above. For example, Jingsong implemented a Flink Table Store
prototype
 before
proposing FLIP-188 in this thread
.

I don't intend to block your progress. Just my two cents. It will be great
to hear more from other developers (e.g. in the voting thread).

Thanks,
Dong


On Thu, Nov 17, 2022 at 1:24 AM Gyula Fóra  wrote:

> Hi Dong,
>
> Let me address your comments.
>
> Time for scale / backlog processing time derivation:
> We can add some more details to the Flip but at this point the
> implementation is actually much simpler than the algorithm to describe it.
> I would not like to add more equations etc because it just overcomplicates
> something relatively simple in practice.
>
> In a nutshell: Time to recover  == lag / processing-rate-after-scaleup.
> It's fairly easy to see where this is going, but best to see in code.
>
> Using pendingRecords and alternative mechanisms:
> True that the current algorithm relies on pending records to effectively
> compute the target source processing rates and therefore scale sources.
> This is available for Kafka which is by far the most common streaming
> source and is used by the majority of streaming applications currently.
> It would be very easy to add alternative purely utilization based scaling
> to the sources. We can start with the current proposal and add this along
> the way before the first version.
>
> Metrics, Configs and Public API:
> The autoscaler feature is proposed for the Flink Kubernetes Operator which
> does not have the same API/config maturity and thus does not provide the
> same guarantees.
> We currently support backward compatibilty for the CRD itself and not the
> configs or metrics. This does not mean that we do not aim to do so but at
> this stage we still have to clean up the details of the newly added
> components. In practice this means that if we manage to get the metrics /
> configs right at the first try we will keep them and provide compatibility,
> but if we feel that we missed something or we don't need something we can
> still remove it. It's a more pragmatic approach for such a component that
> is likely to evolve than setting everything in stone immediately.
>
> Cheers,
> Gyula
>
>
>
> On Wed, Nov 16, 2022 at 6:07 PM Dong Lin  wrote:
>
> > Thanks for the update! Please see comments inline.
> >
> > On Tue, Nov 15, 2022 at 11:46 PM Maximilian Michels 
> > wrote:
> >
> > > Of course! Let me know if your concerns are addressed. The wiki page
> has
> > > been updated.
> > >
> > > >It will be great to add this in the FLIP so that reviewers can
> > understand
> > > how the source parallelisms are computed and how the algorithm works
> > > end-to-end.
> > >
> > > I've updated the FLIP page to add more details on how the backlog-based
> > > scaling works (2).
> > >
> >
> > The algorithm is much more informative now.  The algorithm currently uses
> > "Estimated time for rescale" to derive new source parallelism. Could we
> > also specify in the FLIP how this value is derived?
> >
> > The algorithm currently uses pendingRecords to derive source parallelism.
> > It is an optional metric and KafkaSource currently reports this metric.
> So
> > it means that only the proposed algorithm currently only works when all
> > sources of the job are KafkaSource, right?
> >
> > This issue considerably limits the applicability of this FLIP. Do you
> think
> > most (if not all) streaming source will report this metric?
> Alternatively,
> > any chance we can have a fallback solution to evaluate the source
> > parallelism based on e.g. cpu or idle ratio for cases where this metric
> is
> > not available?
> >
> >
> > > >These metrics and configs are public API and need to be stable across
> > > minor versions, could we document 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-16 Thread Gyula Fóra
Hi Dong,

Let me address your comments.

Time for scale / backlog processing time derivation:
We can add some more details to the Flip but at this point the
implementation is actually much simpler than the algorithm to describe it.
I would not like to add more equations etc because it just overcomplicates
something relatively simple in practice.

In a nutshell: Time to recover  == lag / processing-rate-after-scaleup.
It's fairly easy to see where this is going, but best to see in code.

Using pendingRecords and alternative mechanisms:
True that the current algorithm relies on pending records to effectively
compute the target source processing rates and therefore scale sources.
This is available for Kafka which is by far the most common streaming
source and is used by the majority of streaming applications currently.
It would be very easy to add alternative purely utilization based scaling
to the sources. We can start with the current proposal and add this along
the way before the first version.

Metrics, Configs and Public API:
The autoscaler feature is proposed for the Flink Kubernetes Operator which
does not have the same API/config maturity and thus does not provide the
same guarantees.
We currently support backward compatibilty for the CRD itself and not the
configs or metrics. This does not mean that we do not aim to do so but at
this stage we still have to clean up the details of the newly added
components. In practice this means that if we manage to get the metrics /
configs right at the first try we will keep them and provide compatibility,
but if we feel that we missed something or we don't need something we can
still remove it. It's a more pragmatic approach for such a component that
is likely to evolve than setting everything in stone immediately.

Cheers,
Gyula



On Wed, Nov 16, 2022 at 6:07 PM Dong Lin  wrote:

> Thanks for the update! Please see comments inline.
>
> On Tue, Nov 15, 2022 at 11:46 PM Maximilian Michels 
> wrote:
>
> > Of course! Let me know if your concerns are addressed. The wiki page has
> > been updated.
> >
> > >It will be great to add this in the FLIP so that reviewers can
> understand
> > how the source parallelisms are computed and how the algorithm works
> > end-to-end.
> >
> > I've updated the FLIP page to add more details on how the backlog-based
> > scaling works (2).
> >
>
> The algorithm is much more informative now.  The algorithm currently uses
> "Estimated time for rescale" to derive new source parallelism. Could we
> also specify in the FLIP how this value is derived?
>
> The algorithm currently uses pendingRecords to derive source parallelism.
> It is an optional metric and KafkaSource currently reports this metric. So
> it means that only the proposed algorithm currently only works when all
> sources of the job are KafkaSource, right?
>
> This issue considerably limits the applicability of this FLIP. Do you think
> most (if not all) streaming source will report this metric? Alternatively,
> any chance we can have a fallback solution to evaluate the source
> parallelism based on e.g. cpu or idle ratio for cases where this metric is
> not available?
>
>
> > >These metrics and configs are public API and need to be stable across
> > minor versions, could we document them before finalizing the FLIP?
> >
> > Metrics and config changes are not strictly part of the public API but
> > Gyula has added a section.
> >
>
> Hmm... if metrics are not public API, then it might happen that we change
> the mbean path in a minor release and break users' monitoring tool.
> Similarly, we might change configs in a minor release that break user's job
> behavior. We probably want to avoid these breaking changes in minor
> releases.
>
> It is documented here
> <
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> >
> that
> "Exposed monitoring information" and "Configuration settings" are public
> interfaces of the project.
>
> Maybe we should also specify the metric here so that users can safely setup
> dashboards and tools to track how the autopilot is working, similar to how
> metrics are documented in FLIP-33
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >
> ?
>
>
> > -Max
> >
> > On Tue, Nov 15, 2022 at 3:01 PM Dong Lin  wrote:
> >
> > > Hi Maximilian,
> > >
> > > It seems that the following comments from the previous discussions have
> > not
> > > been addressed yet. Any chance we can have them addressed before
> starting
> > > the voting thread?
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra 
> wrote:
> > >
> > > > Hi Dong!
> > > >
> > > > Let me try to answer the questions :)
> > > >
> > > > 1 : busyTimeMsPerSecond is not specific for CPU, it measures the time
> > > > spent in the main record processing loop for an operator if I
> > > > understand correctly. This includes IO operations too.
> > > >
> > > > 2: We should add this to the FLIP I agree. It 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-16 Thread Dong Lin
Thanks for the update! Please see comments inline.

On Tue, Nov 15, 2022 at 11:46 PM Maximilian Michels  wrote:

> Of course! Let me know if your concerns are addressed. The wiki page has
> been updated.
>
> >It will be great to add this in the FLIP so that reviewers can understand
> how the source parallelisms are computed and how the algorithm works
> end-to-end.
>
> I've updated the FLIP page to add more details on how the backlog-based
> scaling works (2).
>

The algorithm is much more informative now.  The algorithm currently uses
"Estimated time for rescale" to derive new source parallelism. Could we
also specify in the FLIP how this value is derived?

The algorithm currently uses pendingRecords to derive source parallelism.
It is an optional metric and KafkaSource currently reports this metric. So
it means that only the proposed algorithm currently only works when all
sources of the job are KafkaSource, right?

This issue considerably limits the applicability of this FLIP. Do you think
most (if not all) streaming source will report this metric? Alternatively,
any chance we can have a fallback solution to evaluate the source
parallelism based on e.g. cpu or idle ratio for cases where this metric is
not available?


> >These metrics and configs are public API and need to be stable across
> minor versions, could we document them before finalizing the FLIP?
>
> Metrics and config changes are not strictly part of the public API but
> Gyula has added a section.
>

Hmm... if metrics are not public API, then it might happen that we change
the mbean path in a minor release and break users' monitoring tool.
Similarly, we might change configs in a minor release that break user's job
behavior. We probably want to avoid these breaking changes in minor
releases.

It is documented here

that
"Exposed monitoring information" and "Configuration settings" are public
interfaces of the project.

Maybe we should also specify the metric here so that users can safely setup
dashboards and tools to track how the autopilot is working, similar to how
metrics are documented in FLIP-33

?


> -Max
>
> On Tue, Nov 15, 2022 at 3:01 PM Dong Lin  wrote:
>
> > Hi Maximilian,
> >
> > It seems that the following comments from the previous discussions have
> not
> > been addressed yet. Any chance we can have them addressed before starting
> > the voting thread?
> >
> > Thanks,
> > Dong
> >
> > On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra  wrote:
> >
> > > Hi Dong!
> > >
> > > Let me try to answer the questions :)
> > >
> > > 1 : busyTimeMsPerSecond is not specific for CPU, it measures the time
> > > spent in the main record processing loop for an operator if I
> > > understand correctly. This includes IO operations too.
> > >
> > > 2: We should add this to the FLIP I agree. It would be a Duration
> config
> > > with the expected catch up time after rescaling (let's say 5 minutes).
> It
> > > could be computed based on the current data rate and the calculated max
> > > processing rate after the rescale.
> > >
> >
> > It will be great to add this in the FLIP so that reviewers can understand
> > how the source parallelisms are computed and how the algorithm works
> > end-to-end.
> >
> >
> > > 3: In the current proposal we don't have per operator configs. Target
> > > utilization would apply to all operators uniformly.
> > >
> > > 4: It should be configurable, yes.
> > >
> >
> > Since this config is a public API, could we update the FLIP accordingly
> to
> > provide this config?
> >
> >
> > >
> > > 5,6: The names haven't been finalized but I think these are minor
> > details.
> > > We could add concrete names to the FLIP :)
> > >
> >
> > These metrics and configs are public API and need to be stable across
> minor
> > versions, could we document them before finalizing the FLIP?
> >
> >
> > >
> > > Cheers,
> > > Gyula
> > >
> > >
> > > On Sun, Nov 6, 2022 at 5:19 PM Dong Lin  wrote:
> > >
> > >> Hi Max,
> > >>
> > >> Thank you for the proposal. The proposal tackles a very important
> issue
> > >> for Flink users and the design looks promising overall!
> > >>
> > >> I have some questions to better understand the proposed public
> > interfaces
> > >> and the algorithm.
> > >>
> > >> 1) The proposal seems to assume that the operator's
> busyTimeMsPerSecond
> > >> could reach 1 sec. I believe this is mostly true for cpu-bound
> > operators.
> > >> Could you confirm that this can also be true for io-bound operators
> > such as
> > >> sinks? For example, suppose a Kafka Sink subtask has reached I/O
> > bottleneck
> > >> when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
> > >> reach 1 sec?
> > >>
> > >> 2) It is said that "users can configure a maximum time to fully
> process
> > >> the backlog". The configuration section does not seem to provide this
> > >> 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-15 Thread Maximilian Michels
Of course! Let me know if your concerns are addressed. The wiki page has
been updated.

>It will be great to add this in the FLIP so that reviewers can understand
how the source parallelisms are computed and how the algorithm works
end-to-end.

I've updated the FLIP page to add more details on how the backlog-based
scaling works (2).

>These metrics and configs are public API and need to be stable across
minor versions, could we document them before finalizing the FLIP?

Metrics and config changes are not strictly part of the public API but
Gyula has added a section.

-Max

On Tue, Nov 15, 2022 at 3:01 PM Dong Lin  wrote:

> Hi Maximilian,
>
> It seems that the following comments from the previous discussions have not
> been addressed yet. Any chance we can have them addressed before starting
> the voting thread?
>
> Thanks,
> Dong
>
> On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra  wrote:
>
> > Hi Dong!
> >
> > Let me try to answer the questions :)
> >
> > 1 : busyTimeMsPerSecond is not specific for CPU, it measures the time
> > spent in the main record processing loop for an operator if I
> > understand correctly. This includes IO operations too.
> >
> > 2: We should add this to the FLIP I agree. It would be a Duration config
> > with the expected catch up time after rescaling (let's say 5 minutes). It
> > could be computed based on the current data rate and the calculated max
> > processing rate after the rescale.
> >
>
> It will be great to add this in the FLIP so that reviewers can understand
> how the source parallelisms are computed and how the algorithm works
> end-to-end.
>
>
> > 3: In the current proposal we don't have per operator configs. Target
> > utilization would apply to all operators uniformly.
> >
> > 4: It should be configurable, yes.
> >
>
> Since this config is a public API, could we update the FLIP accordingly to
> provide this config?
>
>
> >
> > 5,6: The names haven't been finalized but I think these are minor
> details.
> > We could add concrete names to the FLIP :)
> >
>
> These metrics and configs are public API and need to be stable across minor
> versions, could we document them before finalizing the FLIP?
>
>
> >
> > Cheers,
> > Gyula
> >
> >
> > On Sun, Nov 6, 2022 at 5:19 PM Dong Lin  wrote:
> >
> >> Hi Max,
> >>
> >> Thank you for the proposal. The proposal tackles a very important issue
> >> for Flink users and the design looks promising overall!
> >>
> >> I have some questions to better understand the proposed public
> interfaces
> >> and the algorithm.
> >>
> >> 1) The proposal seems to assume that the operator's busyTimeMsPerSecond
> >> could reach 1 sec. I believe this is mostly true for cpu-bound
> operators.
> >> Could you confirm that this can also be true for io-bound operators
> such as
> >> sinks? For example, suppose a Kafka Sink subtask has reached I/O
> bottleneck
> >> when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
> >> reach 1 sec?
> >>
> >> 2) It is said that "users can configure a maximum time to fully process
> >> the backlog". The configuration section does not seem to provide this
> >> config. Could you specify this? And any chance this proposal can provide
> >> the formula for calculating the new processing rate?
> >>
> >> 3) How are users expected to specify the per-operator configs (e.g.
> >> target utilization)? For example, should users specify it
> programmatically
> >> in a DataStream/Table/SQL API?
> >>
> >> 4) How often will the Flink Kubernetes operator query metrics from
> >> JobManager? Is this configurable?
> >>
> >> 5) Could you specify the config name and default value for the proposed
> >> configs?
> >>
> >> 6) Could you add the name/mbean/type for the proposed metrics?
> >>
> >>
> >> Cheers,
> >> Dong
> >>
> >>
> >>
>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-15 Thread Dong Lin
Hi Maximilian,

It seems that the following comments from the previous discussions have not
been addressed yet. Any chance we can have them addressed before starting
the voting thread?

Thanks,
Dong

On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra  wrote:

> Hi Dong!
>
> Let me try to answer the questions :)
>
> 1 : busyTimeMsPerSecond is not specific for CPU, it measures the time
> spent in the main record processing loop for an operator if I
> understand correctly. This includes IO operations too.
>
> 2: We should add this to the FLIP I agree. It would be a Duration config
> with the expected catch up time after rescaling (let's say 5 minutes). It
> could be computed based on the current data rate and the calculated max
> processing rate after the rescale.
>

It will be great to add this in the FLIP so that reviewers can understand
how the source parallelisms are computed and how the algorithm works
end-to-end.


> 3: In the current proposal we don't have per operator configs. Target
> utilization would apply to all operators uniformly.
>
> 4: It should be configurable, yes.
>

Since this config is a public API, could we update the FLIP accordingly to
provide this config?


>
> 5,6: The names haven't been finalized but I think these are minor details.
> We could add concrete names to the FLIP :)
>

These metrics and configs are public API and need to be stable across minor
versions, could we document them before finalizing the FLIP?


>
> Cheers,
> Gyula
>
>
> On Sun, Nov 6, 2022 at 5:19 PM Dong Lin  wrote:
>
>> Hi Max,
>>
>> Thank you for the proposal. The proposal tackles a very important issue
>> for Flink users and the design looks promising overall!
>>
>> I have some questions to better understand the proposed public interfaces
>> and the algorithm.
>>
>> 1) The proposal seems to assume that the operator's busyTimeMsPerSecond
>> could reach 1 sec. I believe this is mostly true for cpu-bound operators.
>> Could you confirm that this can also be true for io-bound operators such as
>> sinks? For example, suppose a Kafka Sink subtask has reached I/O bottleneck
>> when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
>> reach 1 sec?
>>
>> 2) It is said that "users can configure a maximum time to fully process
>> the backlog". The configuration section does not seem to provide this
>> config. Could you specify this? And any chance this proposal can provide
>> the formula for calculating the new processing rate?
>>
>> 3) How are users expected to specify the per-operator configs (e.g.
>> target utilization)? For example, should users specify it programmatically
>> in a DataStream/Table/SQL API?
>>
>> 4) How often will the Flink Kubernetes operator query metrics from
>> JobManager? Is this configurable?
>>
>> 5) Could you specify the config name and default value for the proposed
>> configs?
>>
>> 6) Could you add the name/mbean/type for the proposed metrics?
>>
>>
>> Cheers,
>> Dong
>>
>>
>>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-15 Thread Gyula Fóra
I agree we should start the vote.

On a separate (but related) small discussion we could also decide
backporting https://issues.apache.org/jira/browse/FLINK-29501 for 1.16.1 so
that the autoscaler could be more efficiently developed and tested and to
make it 1.16 compatible.

Cheers,
Gyula

On Tue, Nov 15, 2022 at 2:09 PM Maximilian Michels  wrote:

> +1 If there are no further comments, I'll start a vote thread in the next
> few days.
>
> -Max
>
>
> On Tue, Nov 15, 2022 at 2:06 PM Zheng Yu Chen  wrote:
>
> > @Gyula  Have a  good news, now flip-256 now is finish and merge it .
> >  flip-271 discussion seems to have stopped and I wonder if there are any
> > other comments. Can we get to the polls and start this exciting feature
> 
> > Maybe I can get involved in developing this feature
> >
> >
> >
> > Gyula Fóra  于2022年11月8日周二 18:46写道:
> >
> > > I had 2 extra comments to Max's reply:
> > >
> > > 1. About pre-allocating resources:
> > > This could be done through the operator when the standalone deployment
> > mode
> > > is used relatively easily as there we have better control of
> > > pods/resources.
> > >
> > > 2. Session jobs:
> > > There is a FLIP (
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-256%3A+Support+Job+Dynamic+Parameter+With+Flink+Rest+Api
> > > )
> > > to support passing configuration when we submit jobs to the session
> > cluster
> > > through the rest api. Once that goes through, session jobs can also be
> > > scaled in a similar way through the configuration.
> > >
> > > Cheers,
> > > Gyula
> > >
> > >
> > > On Tue, Nov 8, 2022 at 11:39 AM Maximilian Michels 
> > wrote:
> > >
> > > > @Yang
> > > >
> > > > >Since the current auto-scaling needs to fully redeploy the
> > application,
> > > it
> > > > may fail to start due to lack of resources.
> > > >
> > > > Great suggestions. I agree that we will have to have to preallocate /
> > > > reserve resources to ensure the rescaling doesn't take longer as
> > > expected.
> > > > This is not only a problem when scaling up but also when scaling down
> > > > because any pods surrendered might be taken over by another
> deployment
> > > > during the rescaling. This would certainly be a case for integrating
> > > > autoscaling with the Flink scheduler, e.g. via FLIP-250 or via the
> > > > rescaling API. Alternatively, the operator would have to reserve the
> > > > resources somehow.
> > > >
> > > > >Does auto-scaling have a plan to support jobs which are running in a
> > > > session cluster? It might be a different
> > > >
> > > > We are targeting the application deployment mode for the first
> version
> > > but
> > > > the standalone mode can be supported as soon as we have an
> integration
> > > with
> > > > the scheduler.
> > > >
> > > > > # Horizontal scaling V.S. Vertical scaling
> > > >
> > > > True. We left out vertical scaling intentionally. For now we assume
> > CPU /
> > > > memory is set up by the user. While definitely useful, vertical
> scaling
> > > > adds another dimension to the scaling problem which we wanted to
> tackle
> > > > later. I'll update the FLIP to explicitly state that.
> > > >
> > > > -Max
> > > >
> > > >
> > > >
> > > > On Tue, Nov 8, 2022 at 3:59 AM Yang Wang 
> > wrote:
> > > >
> > > > > Thanks for the fruitful discussion and I am really excited to see
> > that
> > > > the
> > > > > auto-scaling really happens for
> > > > >
> > > > > Flink Kubernetes operator. It will be a very important step to make
> > the
> > > > > long-running Flink job more smoothly.
> > > > >
> > > > > I just have some immature ideas and want to share them here.
> > > > >
> > > > >
> > > > > # Resource Reservation
> > > > >
> > > > > Since the current auto-scaling needs to fully redeploy the
> > application,
> > > > it
> > > > > may fail to start due to lack of resources.
> > > > >
> > > > > I know the Kubernetes operator could rollback to the old spec, but
> we
> > > > still
> > > > > waste a lot of time to make things worse.
> > > > >
> > > > > I hope the FLIP-250[1](support customized K8s scheduler) could help
> > in
> > > > this
> > > > > case.
> > > > >
> > > > >
> > > > > # Session cluster
> > > > >
> > > > > Does auto-scaling have a plan to support jobs which are running in
> a
> > > > > session cluster? It might be a different
> > > > >
> > > > > story since we could not use Flink config options to override the
> > job
> > > > > vertex parallelisms. Given that the SessionJob
> > > > >
> > > > > is also a first-class citizen, we need to document the limitation
> if
> > > not
> > > > > support.
> > > > >
> > > > >
> > > > > # Horizontal scaling V.S. Vertical scaling
> > > > >
> > > > > IIUC, the current proposal does not mention vertical scaling. There
> > > might
> > > > > be a chance that the memory/cpu of
> > > > >
> > > > > TaskManager is not configured properly. And this will cause
> > unnecessary
> > > > > multiple scaling executions.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > [1].
> > > > >
> > > > 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-15 Thread Maximilian Michels
+1 If there are no further comments, I'll start a vote thread in the next
few days.

-Max


On Tue, Nov 15, 2022 at 2:06 PM Zheng Yu Chen  wrote:

> @Gyula  Have a  good news, now flip-256 now is finish and merge it .
>  flip-271 discussion seems to have stopped and I wonder if there are any
> other comments. Can we get to the polls and start this exciting feature 
> Maybe I can get involved in developing this feature
>
>
>
> Gyula Fóra  于2022年11月8日周二 18:46写道:
>
> > I had 2 extra comments to Max's reply:
> >
> > 1. About pre-allocating resources:
> > This could be done through the operator when the standalone deployment
> mode
> > is used relatively easily as there we have better control of
> > pods/resources.
> >
> > 2. Session jobs:
> > There is a FLIP (
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-256%3A+Support+Job+Dynamic+Parameter+With+Flink+Rest+Api
> > )
> > to support passing configuration when we submit jobs to the session
> cluster
> > through the rest api. Once that goes through, session jobs can also be
> > scaled in a similar way through the configuration.
> >
> > Cheers,
> > Gyula
> >
> >
> > On Tue, Nov 8, 2022 at 11:39 AM Maximilian Michels 
> wrote:
> >
> > > @Yang
> > >
> > > >Since the current auto-scaling needs to fully redeploy the
> application,
> > it
> > > may fail to start due to lack of resources.
> > >
> > > Great suggestions. I agree that we will have to have to preallocate /
> > > reserve resources to ensure the rescaling doesn't take longer as
> > expected.
> > > This is not only a problem when scaling up but also when scaling down
> > > because any pods surrendered might be taken over by another deployment
> > > during the rescaling. This would certainly be a case for integrating
> > > autoscaling with the Flink scheduler, e.g. via FLIP-250 or via the
> > > rescaling API. Alternatively, the operator would have to reserve the
> > > resources somehow.
> > >
> > > >Does auto-scaling have a plan to support jobs which are running in a
> > > session cluster? It might be a different
> > >
> > > We are targeting the application deployment mode for the first version
> > but
> > > the standalone mode can be supported as soon as we have an integration
> > with
> > > the scheduler.
> > >
> > > > # Horizontal scaling V.S. Vertical scaling
> > >
> > > True. We left out vertical scaling intentionally. For now we assume
> CPU /
> > > memory is set up by the user. While definitely useful, vertical scaling
> > > adds another dimension to the scaling problem which we wanted to tackle
> > > later. I'll update the FLIP to explicitly state that.
> > >
> > > -Max
> > >
> > >
> > >
> > > On Tue, Nov 8, 2022 at 3:59 AM Yang Wang 
> wrote:
> > >
> > > > Thanks for the fruitful discussion and I am really excited to see
> that
> > > the
> > > > auto-scaling really happens for
> > > >
> > > > Flink Kubernetes operator. It will be a very important step to make
> the
> > > > long-running Flink job more smoothly.
> > > >
> > > > I just have some immature ideas and want to share them here.
> > > >
> > > >
> > > > # Resource Reservation
> > > >
> > > > Since the current auto-scaling needs to fully redeploy the
> application,
> > > it
> > > > may fail to start due to lack of resources.
> > > >
> > > > I know the Kubernetes operator could rollback to the old spec, but we
> > > still
> > > > waste a lot of time to make things worse.
> > > >
> > > > I hope the FLIP-250[1](support customized K8s scheduler) could help
> in
> > > this
> > > > case.
> > > >
> > > >
> > > > # Session cluster
> > > >
> > > > Does auto-scaling have a plan to support jobs which are running in a
> > > > session cluster? It might be a different
> > > >
> > > > story since we could not use Flink config options to override the
> job
> > > > vertex parallelisms. Given that the SessionJob
> > > >
> > > > is also a first-class citizen, we need to document the limitation if
> > not
> > > > support.
> > > >
> > > >
> > > > # Horizontal scaling V.S. Vertical scaling
> > > >
> > > > IIUC, the current proposal does not mention vertical scaling. There
> > might
> > > > be a chance that the memory/cpu of
> > > >
> > > > TaskManager is not configured properly. And this will cause
> unnecessary
> > > > multiple scaling executions.
> > > >
> > > >
> > > >
> > > >
> > > > [1].
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-250%3A+Support+Customized+Kubernetes+Schedulers+Proposal
> > > >
> > > >
> > > >
> > > > Best,
> > > >
> > > > Yang
> > > >
> > > > Maximilian Michels  于2022年11月8日周二 00:31写道:
> > > >
> > > > > Thanks for all the interest here and for the great remarks! Gyula
> > > > > already did a great job addressing the questions here. Let me try
> to
> > > > > add additional context:
> > > > >
> > > > > @Biao Geng:
> > > > >
> > > > > >1.  For source parallelisms, if the user configure a much larger
> > value
> > > > > than normal, there should be very little pending records though it
> is
> 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-15 Thread Zheng Yu Chen
@Gyula  Have a  good news, now flip-256 now is finish and merge it .
 flip-271 discussion seems to have stopped and I wonder if there are any
other comments. Can we get to the polls and start this exciting feature 
Maybe I can get involved in developing this feature



Gyula Fóra  于2022年11月8日周二 18:46写道:

> I had 2 extra comments to Max's reply:
>
> 1. About pre-allocating resources:
> This could be done through the operator when the standalone deployment mode
> is used relatively easily as there we have better control of
> pods/resources.
>
> 2. Session jobs:
> There is a FLIP (
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-256%3A+Support+Job+Dynamic+Parameter+With+Flink+Rest+Api
> )
> to support passing configuration when we submit jobs to the session cluster
> through the rest api. Once that goes through, session jobs can also be
> scaled in a similar way through the configuration.
>
> Cheers,
> Gyula
>
>
> On Tue, Nov 8, 2022 at 11:39 AM Maximilian Michels  wrote:
>
> > @Yang
> >
> > >Since the current auto-scaling needs to fully redeploy the application,
> it
> > may fail to start due to lack of resources.
> >
> > Great suggestions. I agree that we will have to have to preallocate /
> > reserve resources to ensure the rescaling doesn't take longer as
> expected.
> > This is not only a problem when scaling up but also when scaling down
> > because any pods surrendered might be taken over by another deployment
> > during the rescaling. This would certainly be a case for integrating
> > autoscaling with the Flink scheduler, e.g. via FLIP-250 or via the
> > rescaling API. Alternatively, the operator would have to reserve the
> > resources somehow.
> >
> > >Does auto-scaling have a plan to support jobs which are running in a
> > session cluster? It might be a different
> >
> > We are targeting the application deployment mode for the first version
> but
> > the standalone mode can be supported as soon as we have an integration
> with
> > the scheduler.
> >
> > > # Horizontal scaling V.S. Vertical scaling
> >
> > True. We left out vertical scaling intentionally. For now we assume CPU /
> > memory is set up by the user. While definitely useful, vertical scaling
> > adds another dimension to the scaling problem which we wanted to tackle
> > later. I'll update the FLIP to explicitly state that.
> >
> > -Max
> >
> >
> >
> > On Tue, Nov 8, 2022 at 3:59 AM Yang Wang  wrote:
> >
> > > Thanks for the fruitful discussion and I am really excited to see that
> > the
> > > auto-scaling really happens for
> > >
> > > Flink Kubernetes operator. It will be a very important step to make the
> > > long-running Flink job more smoothly.
> > >
> > > I just have some immature ideas and want to share them here.
> > >
> > >
> > > # Resource Reservation
> > >
> > > Since the current auto-scaling needs to fully redeploy the application,
> > it
> > > may fail to start due to lack of resources.
> > >
> > > I know the Kubernetes operator could rollback to the old spec, but we
> > still
> > > waste a lot of time to make things worse.
> > >
> > > I hope the FLIP-250[1](support customized K8s scheduler) could help in
> > this
> > > case.
> > >
> > >
> > > # Session cluster
> > >
> > > Does auto-scaling have a plan to support jobs which are running in a
> > > session cluster? It might be a different
> > >
> > > story since we could not use Flink config options to override the  job
> > > vertex parallelisms. Given that the SessionJob
> > >
> > > is also a first-class citizen, we need to document the limitation if
> not
> > > support.
> > >
> > >
> > > # Horizontal scaling V.S. Vertical scaling
> > >
> > > IIUC, the current proposal does not mention vertical scaling. There
> might
> > > be a chance that the memory/cpu of
> > >
> > > TaskManager is not configured properly. And this will cause unnecessary
> > > multiple scaling executions.
> > >
> > >
> > >
> > >
> > > [1].
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-250%3A+Support+Customized+Kubernetes+Schedulers+Proposal
> > >
> > >
> > >
> > > Best,
> > >
> > > Yang
> > >
> > > Maximilian Michels  于2022年11月8日周二 00:31写道:
> > >
> > > > Thanks for all the interest here and for the great remarks! Gyula
> > > > already did a great job addressing the questions here. Let me try to
> > > > add additional context:
> > > >
> > > > @Biao Geng:
> > > >
> > > > >1.  For source parallelisms, if the user configure a much larger
> value
> > > > than normal, there should be very little pending records though it is
> > > > possible to get optimized. But IIUC, in current algorithm, we will
> not
> > > take
> > > > actions for this case as the backlog growth rate is almost zero. Is
> the
> > > > understanding right?
> > > >
> > > > This is actually a corner case which we haven't exactly described in
> > > > the FLIP yet. Sources are assumed to only be scaled according to the
> > > > backlog but if there is zero backlog, we don't have a number to
> > > > compute the 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-08 Thread Maximilian Michels
>> # Horizontal scaling V.S. Vertical scaling
>
>True. We left out vertical scaling intentionally. For now we assume CPU /
memory is set up by the user. While definitely useful, vertical scaling
>adds another dimension to the scaling problem which we wanted to tackle
later. I'll update the FLIP to explicitly state that.

Actually, the proposed algorithm is a kind of hybrid between horizontal and
vertical scaling. The vertical scaling aspect is due to the parallelism
adjustments of each task. Pure horizontal scaling would uniformly adjust
the parallelism of all tasks. The motivation there is to optimize the data
flow and prevent backpressure. At the same time, we don't touch the
underlying container resources or the number of task slots per TaskManager
as one typically would in vertical scaling.

On Tue, Nov 8, 2022 at 11:47 AM Gyula Fóra  wrote:

> I had 2 extra comments to Max's reply:
>
> 1. About pre-allocating resources:
> This could be done through the operator when the standalone deployment mode
> is used relatively easily as there we have better control of
> pods/resources.
>
> 2. Session jobs:
> There is a FLIP (
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-256%3A+Support+Job+Dynamic+Parameter+With+Flink+Rest+Api
> )
> to support passing configuration when we submit jobs to the session cluster
> through the rest api. Once that goes through, session jobs can also be
> scaled in a similar way through the configuration.
>
> Cheers,
> Gyula
>
>
> On Tue, Nov 8, 2022 at 11:39 AM Maximilian Michels  wrote:
>
> > @Yang
> >
> > >Since the current auto-scaling needs to fully redeploy the application,
> it
> > may fail to start due to lack of resources.
> >
> > Great suggestions. I agree that we will have to have to preallocate /
> > reserve resources to ensure the rescaling doesn't take longer as
> expected.
> > This is not only a problem when scaling up but also when scaling down
> > because any pods surrendered might be taken over by another deployment
> > during the rescaling. This would certainly be a case for integrating
> > autoscaling with the Flink scheduler, e.g. via FLIP-250 or via the
> > rescaling API. Alternatively, the operator would have to reserve the
> > resources somehow.
> >
> > >Does auto-scaling have a plan to support jobs which are running in a
> > session cluster? It might be a different
> >
> > We are targeting the application deployment mode for the first version
> but
> > the standalone mode can be supported as soon as we have an integration
> with
> > the scheduler.
> >
> > > # Horizontal scaling V.S. Vertical scaling
> >
> > True. We left out vertical scaling intentionally. For now we assume CPU /
> > memory is set up by the user. While definitely useful, vertical scaling
> > adds another dimension to the scaling problem which we wanted to tackle
> > later. I'll update the FLIP to explicitly state that.
> >
> > -Max
> >
> >
> >
> > On Tue, Nov 8, 2022 at 3:59 AM Yang Wang  wrote:
> >
> > > Thanks for the fruitful discussion and I am really excited to see that
> > the
> > > auto-scaling really happens for
> > >
> > > Flink Kubernetes operator. It will be a very important step to make the
> > > long-running Flink job more smoothly.
> > >
> > > I just have some immature ideas and want to share them here.
> > >
> > >
> > > # Resource Reservation
> > >
> > > Since the current auto-scaling needs to fully redeploy the application,
> > it
> > > may fail to start due to lack of resources.
> > >
> > > I know the Kubernetes operator could rollback to the old spec, but we
> > still
> > > waste a lot of time to make things worse.
> > >
> > > I hope the FLIP-250[1](support customized K8s scheduler) could help in
> > this
> > > case.
> > >
> > >
> > > # Session cluster
> > >
> > > Does auto-scaling have a plan to support jobs which are running in a
> > > session cluster? It might be a different
> > >
> > > story since we could not use Flink config options to override the  job
> > > vertex parallelisms. Given that the SessionJob
> > >
> > > is also a first-class citizen, we need to document the limitation if
> not
> > > support.
> > >
> > >
> > > # Horizontal scaling V.S. Vertical scaling
> > >
> > > IIUC, the current proposal does not mention vertical scaling. There
> might
> > > be a chance that the memory/cpu of
> > >
> > > TaskManager is not configured properly. And this will cause unnecessary
> > > multiple scaling executions.
> > >
> > >
> > >
> > >
> > > [1].
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-250%3A+Support+Customized+Kubernetes+Schedulers+Proposal
> > >
> > >
> > >
> > > Best,
> > >
> > > Yang
> > >
> > > Maximilian Michels  于2022年11月8日周二 00:31写道:
> > >
> > > > Thanks for all the interest here and for the great remarks! Gyula
> > > > already did a great job addressing the questions here. Let me try to
> > > > add additional context:
> > > >
> > > > @Biao Geng:
> > > >
> > > > >1.  For source parallelisms, if the user configure a much 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-08 Thread Gyula Fóra
I had 2 extra comments to Max's reply:

1. About pre-allocating resources:
This could be done through the operator when the standalone deployment mode
is used relatively easily as there we have better control of pods/resources.

2. Session jobs:
There is a FLIP (
https://cwiki.apache.org/confluence/display/FLINK/FLIP-256%3A+Support+Job+Dynamic+Parameter+With+Flink+Rest+Api
)
to support passing configuration when we submit jobs to the session cluster
through the rest api. Once that goes through, session jobs can also be
scaled in a similar way through the configuration.

Cheers,
Gyula


On Tue, Nov 8, 2022 at 11:39 AM Maximilian Michels  wrote:

> @Yang
>
> >Since the current auto-scaling needs to fully redeploy the application, it
> may fail to start due to lack of resources.
>
> Great suggestions. I agree that we will have to have to preallocate /
> reserve resources to ensure the rescaling doesn't take longer as expected.
> This is not only a problem when scaling up but also when scaling down
> because any pods surrendered might be taken over by another deployment
> during the rescaling. This would certainly be a case for integrating
> autoscaling with the Flink scheduler, e.g. via FLIP-250 or via the
> rescaling API. Alternatively, the operator would have to reserve the
> resources somehow.
>
> >Does auto-scaling have a plan to support jobs which are running in a
> session cluster? It might be a different
>
> We are targeting the application deployment mode for the first version but
> the standalone mode can be supported as soon as we have an integration with
> the scheduler.
>
> > # Horizontal scaling V.S. Vertical scaling
>
> True. We left out vertical scaling intentionally. For now we assume CPU /
> memory is set up by the user. While definitely useful, vertical scaling
> adds another dimension to the scaling problem which we wanted to tackle
> later. I'll update the FLIP to explicitly state that.
>
> -Max
>
>
>
> On Tue, Nov 8, 2022 at 3:59 AM Yang Wang  wrote:
>
> > Thanks for the fruitful discussion and I am really excited to see that
> the
> > auto-scaling really happens for
> >
> > Flink Kubernetes operator. It will be a very important step to make the
> > long-running Flink job more smoothly.
> >
> > I just have some immature ideas and want to share them here.
> >
> >
> > # Resource Reservation
> >
> > Since the current auto-scaling needs to fully redeploy the application,
> it
> > may fail to start due to lack of resources.
> >
> > I know the Kubernetes operator could rollback to the old spec, but we
> still
> > waste a lot of time to make things worse.
> >
> > I hope the FLIP-250[1](support customized K8s scheduler) could help in
> this
> > case.
> >
> >
> > # Session cluster
> >
> > Does auto-scaling have a plan to support jobs which are running in a
> > session cluster? It might be a different
> >
> > story since we could not use Flink config options to override the  job
> > vertex parallelisms. Given that the SessionJob
> >
> > is also a first-class citizen, we need to document the limitation if not
> > support.
> >
> >
> > # Horizontal scaling V.S. Vertical scaling
> >
> > IIUC, the current proposal does not mention vertical scaling. There might
> > be a chance that the memory/cpu of
> >
> > TaskManager is not configured properly. And this will cause unnecessary
> > multiple scaling executions.
> >
> >
> >
> >
> > [1].
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-250%3A+Support+Customized+Kubernetes+Schedulers+Proposal
> >
> >
> >
> > Best,
> >
> > Yang
> >
> > Maximilian Michels  于2022年11月8日周二 00:31写道:
> >
> > > Thanks for all the interest here and for the great remarks! Gyula
> > > already did a great job addressing the questions here. Let me try to
> > > add additional context:
> > >
> > > @Biao Geng:
> > >
> > > >1.  For source parallelisms, if the user configure a much larger value
> > > than normal, there should be very little pending records though it is
> > > possible to get optimized. But IIUC, in current algorithm, we will not
> > take
> > > actions for this case as the backlog growth rate is almost zero. Is the
> > > understanding right?
> > >
> > > This is actually a corner case which we haven't exactly described in
> > > the FLIP yet. Sources are assumed to only be scaled according to the
> > > backlog but if there is zero backlog, we don't have a number to
> > > compute the parallelism. In this case we tune the source based on the
> > > utilization, just like we do for the other vertices. That could mean
> > > reducing the parallelism in case the source is not doing any work.
> > > Now, in case there is no backlog, we need to be careful that we don't
> > > bounce back to a higher parallelism afterwards.
> > >
> > > >2.  Compared with “scaling out”, “scaling in” is usually more
> dangerous
> > > as it is more likely to lead to negative influence to the downstream
> > jobs.
> > > The min/max load bounds should be useful. I am wondering if it is
> > 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-08 Thread Maximilian Michels
@Yang

>Since the current auto-scaling needs to fully redeploy the application, it
may fail to start due to lack of resources.

Great suggestions. I agree that we will have to have to preallocate /
reserve resources to ensure the rescaling doesn't take longer as expected.
This is not only a problem when scaling up but also when scaling down
because any pods surrendered might be taken over by another deployment
during the rescaling. This would certainly be a case for integrating
autoscaling with the Flink scheduler, e.g. via FLIP-250 or via the
rescaling API. Alternatively, the operator would have to reserve the
resources somehow.

>Does auto-scaling have a plan to support jobs which are running in a
session cluster? It might be a different

We are targeting the application deployment mode for the first version but
the standalone mode can be supported as soon as we have an integration with
the scheduler.

> # Horizontal scaling V.S. Vertical scaling

True. We left out vertical scaling intentionally. For now we assume CPU /
memory is set up by the user. While definitely useful, vertical scaling
adds another dimension to the scaling problem which we wanted to tackle
later. I'll update the FLIP to explicitly state that.

-Max



On Tue, Nov 8, 2022 at 3:59 AM Yang Wang  wrote:

> Thanks for the fruitful discussion and I am really excited to see that the
> auto-scaling really happens for
>
> Flink Kubernetes operator. It will be a very important step to make the
> long-running Flink job more smoothly.
>
> I just have some immature ideas and want to share them here.
>
>
> # Resource Reservation
>
> Since the current auto-scaling needs to fully redeploy the application, it
> may fail to start due to lack of resources.
>
> I know the Kubernetes operator could rollback to the old spec, but we still
> waste a lot of time to make things worse.
>
> I hope the FLIP-250[1](support customized K8s scheduler) could help in this
> case.
>
>
> # Session cluster
>
> Does auto-scaling have a plan to support jobs which are running in a
> session cluster? It might be a different
>
> story since we could not use Flink config options to override the  job
> vertex parallelisms. Given that the SessionJob
>
> is also a first-class citizen, we need to document the limitation if not
> support.
>
>
> # Horizontal scaling V.S. Vertical scaling
>
> IIUC, the current proposal does not mention vertical scaling. There might
> be a chance that the memory/cpu of
>
> TaskManager is not configured properly. And this will cause unnecessary
> multiple scaling executions.
>
>
>
>
> [1].
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-250%3A+Support+Customized+Kubernetes+Schedulers+Proposal
>
>
>
> Best,
>
> Yang
>
> Maximilian Michels  于2022年11月8日周二 00:31写道:
>
> > Thanks for all the interest here and for the great remarks! Gyula
> > already did a great job addressing the questions here. Let me try to
> > add additional context:
> >
> > @Biao Geng:
> >
> > >1.  For source parallelisms, if the user configure a much larger value
> > than normal, there should be very little pending records though it is
> > possible to get optimized. But IIUC, in current algorithm, we will not
> take
> > actions for this case as the backlog growth rate is almost zero. Is the
> > understanding right?
> >
> > This is actually a corner case which we haven't exactly described in
> > the FLIP yet. Sources are assumed to only be scaled according to the
> > backlog but if there is zero backlog, we don't have a number to
> > compute the parallelism. In this case we tune the source based on the
> > utilization, just like we do for the other vertices. That could mean
> > reducing the parallelism in case the source is not doing any work.
> > Now, in case there is no backlog, we need to be careful that we don't
> > bounce back to a higher parallelism afterwards.
> >
> > >2.  Compared with “scaling out”, “scaling in” is usually more dangerous
> > as it is more likely to lead to negative influence to the downstream
> jobs.
> > The min/max load bounds should be useful. I am wondering if it is
> possible
> > to have different strategy for “scaling in” to make it more conservative.
> > Or more eagerly, allow custom autoscaling strategy(e.g. time-based
> > strategy).
> >
> > Gyula already mentioned the bounded scale-down. Additionally, we could
> > add more conservative utilization targets for scale down. For example,
> > if we targeted 60% utilization for scale-up, we might target 30%
> > utilization for scale-down, essentially reducing the parallelism
> > slower. Same as with the limited parallelism scale-down, in the worst
> > case this will require multiple scale downs. Ideally, the metrics
> > should be reliable enough such that we do not require such
> > workarounds.
> >
> > @JunRui Lee:
> >
> > >In the document, I didn't find the definition of when to trigger
> > autoScaling after some jobVertex reach the threshold. If I missed is,
> > please let me know.
> >
> > The 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-07 Thread Yang Wang
Thanks for the fruitful discussion and I am really excited to see that the
auto-scaling really happens for

Flink Kubernetes operator. It will be a very important step to make the
long-running Flink job more smoothly.

I just have some immature ideas and want to share them here.


# Resource Reservation

Since the current auto-scaling needs to fully redeploy the application, it
may fail to start due to lack of resources.

I know the Kubernetes operator could rollback to the old spec, but we still
waste a lot of time to make things worse.

I hope the FLIP-250[1](support customized K8s scheduler) could help in this
case.


# Session cluster

Does auto-scaling have a plan to support jobs which are running in a
session cluster? It might be a different

story since we could not use Flink config options to override the  job
vertex parallelisms. Given that the SessionJob

is also a first-class citizen, we need to document the limitation if not
support.


# Horizontal scaling V.S. Vertical scaling

IIUC, the current proposal does not mention vertical scaling. There might
be a chance that the memory/cpu of

TaskManager is not configured properly. And this will cause unnecessary
multiple scaling executions.




[1].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-250%3A+Support+Customized+Kubernetes+Schedulers+Proposal



Best,

Yang

Maximilian Michels  于2022年11月8日周二 00:31写道:

> Thanks for all the interest here and for the great remarks! Gyula
> already did a great job addressing the questions here. Let me try to
> add additional context:
>
> @Biao Geng:
>
> >1.  For source parallelisms, if the user configure a much larger value
> than normal, there should be very little pending records though it is
> possible to get optimized. But IIUC, in current algorithm, we will not take
> actions for this case as the backlog growth rate is almost zero. Is the
> understanding right?
>
> This is actually a corner case which we haven't exactly described in
> the FLIP yet. Sources are assumed to only be scaled according to the
> backlog but if there is zero backlog, we don't have a number to
> compute the parallelism. In this case we tune the source based on the
> utilization, just like we do for the other vertices. That could mean
> reducing the parallelism in case the source is not doing any work.
> Now, in case there is no backlog, we need to be careful that we don't
> bounce back to a higher parallelism afterwards.
>
> >2.  Compared with “scaling out”, “scaling in” is usually more dangerous
> as it is more likely to lead to negative influence to the downstream jobs.
> The min/max load bounds should be useful. I am wondering if it is possible
> to have different strategy for “scaling in” to make it more conservative.
> Or more eagerly, allow custom autoscaling strategy(e.g. time-based
> strategy).
>
> Gyula already mentioned the bounded scale-down. Additionally, we could
> add more conservative utilization targets for scale down. For example,
> if we targeted 60% utilization for scale-up, we might target 30%
> utilization for scale-down, essentially reducing the parallelism
> slower. Same as with the limited parallelism scale-down, in the worst
> case this will require multiple scale downs. Ideally, the metrics
> should be reliable enough such that we do not require such
> workarounds.
>
> @JunRui Lee:
>
> >In the document, I didn't find the definition of when to trigger
> autoScaling after some jobVertex reach the threshold. If I missed is,
> please let me know.
>
> The triggering is supposed to work based on the number of metric
> reports to aggregate and the cool down time. Additionally, there are
> boundaries for the target rates such that we don't scale on tiny
> deviations of the rates. I agree that we want to prevent unnecessary
> scalings as much as possible. We'll expand on that.
>
> @Pedro Silva:
>
> >Have you considered making metrics collection getting triggered based on
> events rather than periodic checks?
>
> Ideally we want to continuously monitor the job to be able to find
> bottlenecks. Based on the metrics, we will decide whether to scale or
> not. However, if we find that the continuous monitoring is too costly,
> we might do it based on signals. Also, if there is some key-turn event
> that we must refresh our metrics for, that could also be interesting.
> A sudden spike in the backlog could warrant that.
>
> > Could the FLIP also be used to auto-scale based on state-level metrics
> at an operator level?
>
> It could but we don't want to modify the JobGraph which means we are
> bound to using task-level parallelism. Setting operator level
> parallelism would mean rebuilding the JobGraph which is a tricky thing
> to do. It would increase the solution space but also the complexity of
> finding a stable scaling configuration.
>
> @Zheng:
>
> >After the user opens (advcie), it does not actually perform AutoScaling.
> It only outputs the notification form of tuning suggestions for the user's
> reference.

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-07 Thread Maximilian Michels
Thanks for all the interest here and for the great remarks! Gyula
already did a great job addressing the questions here. Let me try to
add additional context:

@Biao Geng:

>1.  For source parallelisms, if the user configure a much larger value than 
>normal, there should be very little pending records though it is possible to 
>get optimized. But IIUC, in current algorithm, we will not take actions for 
>this case as the backlog growth rate is almost zero. Is the understanding 
>right?

This is actually a corner case which we haven't exactly described in
the FLIP yet. Sources are assumed to only be scaled according to the
backlog but if there is zero backlog, we don't have a number to
compute the parallelism. In this case we tune the source based on the
utilization, just like we do for the other vertices. That could mean
reducing the parallelism in case the source is not doing any work.
Now, in case there is no backlog, we need to be careful that we don't
bounce back to a higher parallelism afterwards.

>2.  Compared with “scaling out”, “scaling in” is usually more dangerous as it 
>is more likely to lead to negative influence to the downstream jobs. The 
>min/max load bounds should be useful. I am wondering if it is possible to have 
>different strategy for “scaling in” to make it more conservative. Or more 
>eagerly, allow custom autoscaling strategy(e.g. time-based strategy).

Gyula already mentioned the bounded scale-down. Additionally, we could
add more conservative utilization targets for scale down. For example,
if we targeted 60% utilization for scale-up, we might target 30%
utilization for scale-down, essentially reducing the parallelism
slower. Same as with the limited parallelism scale-down, in the worst
case this will require multiple scale downs. Ideally, the metrics
should be reliable enough such that we do not require such
workarounds.

@JunRui Lee:

>In the document, I didn't find the definition of when to trigger autoScaling 
>after some jobVertex reach the threshold. If I missed is, please let me know.

The triggering is supposed to work based on the number of metric
reports to aggregate and the cool down time. Additionally, there are
boundaries for the target rates such that we don't scale on tiny
deviations of the rates. I agree that we want to prevent unnecessary
scalings as much as possible. We'll expand on that.

@Pedro Silva:

>Have you considered making metrics collection getting triggered based on 
>events rather than periodic checks?

Ideally we want to continuously monitor the job to be able to find
bottlenecks. Based on the metrics, we will decide whether to scale or
not. However, if we find that the continuous monitoring is too costly,
we might do it based on signals. Also, if there is some key-turn event
that we must refresh our metrics for, that could also be interesting.
A sudden spike in the backlog could warrant that.

> Could the FLIP also be used to auto-scale based on state-level metrics at an 
> operator level?

It could but we don't want to modify the JobGraph which means we are
bound to using task-level parallelism. Setting operator level
parallelism would mean rebuilding the JobGraph which is a tricky thing
to do. It would increase the solution space but also the complexity of
finding a stable scaling configuration.

@Zheng:

>After the user opens (advcie), it does not actually perform AutoScaling. It 
>only outputs the notification form of tuning suggestions for the user's 
>reference.

That's a great idea. Such a "dry run" feature would give users a
better sense of how the autoscaler would work.

>I found that FFA 2020 Netflix has a related topic discussing the automatic 
>tuning function

Thanks! They actually use a similar idea with respect to backlog-based
scaling. Where their approach differs is that they scale the entire
job based on a target load instead of scaling vertices individually.
They have some interesting ideas like the lookup table for past
scalings and the load prediction based on regressions. I have
intentionally left out those optimizations but I think they can be
useful if implemented well.

>Can we provide some interfaces for users to customize and implement some 
>tuning algorithms?

Certainly, I think it is critical that we provide a default
implementation that works well for all kinds of Flink jobs. But users
should have the option to plug in their own implementation. This will
especially be useful for new sources which might not have the backlog
information available as some of the built-in ones like Kafka.

@Dong:

>For example, suppose a Kafka Sink subtask has reached I/O bottleneck
when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
reach 1 sec?

The busyTimeMsPerSecond metric captures all the work of the task
excluding backpressure time. So a sink's IO work would be included,
unless it is non-blocking. Typically sinks write to external systems
which can itself backpressure, and should to avoid the sink to be

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-07 Thread Gyula Fóra
@Dong:

Looking at the busyTime metrics in the TaskOMetricGroup it seems that busy
time is actually defined as "not idle or (soft) backpressured" . So I think
it would give us the correct reading based on what you said about the Kafka
sink.

In any case we have to test this and if something is not as we expect we
should fix the metric.

Gyula

On Mon, Nov 7, 2022 at 5:00 PM Dong Lin  wrote:

> Thanks for the explanation Gyula. Please see my reply inline.
>
> BTW, has the proposed solution been deployed and evaluated with any
> production workload? If yes, I am wondering if you could share the
> experience, e.g. what is the likelihood of having regression and
> improvement respectively after enabling this feature.
>
>
> On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra  wrote:
>
>> Hi Dong!
>>
>> Let me try to answer the questions :)
>>
>> 1 : busyTimeMsPerSecond is not specific for CPU, it measures the time
>> spent in the main record processing loop for an operator if I
>> understand correctly. This includes IO operations too.
>>
>
> I took a look at the StreamTask::processInput(...)
> .
> My understanding of this code is that when KafkaSink (same for other sinks)
> can not write data out to the network fast enough, recordWriter
> 
> becomes unavailable and the StreamTask is considered to be
> soft-back-pressured, instead of being considered busy.
>
> If this is the case, then the algorithm currently proposed in the FLIP
> might under-provision the sink operator's parallelism when the sink
> operator is the bottleneck. I am not an expert with this piece of code
> though. I am wondering if you or someone else could double-check this.
>
>
>> 2: We should add this to the FLIP I agree. It would be a Duration config
>> with the expected catch up time after rescaling (let's say 5 minutes). It
>> could be computed based on the current data rate and the calculated max
>> processing rate after the rescale.
>>
>> Great. I am looking forward to the formula :)
>
>
>> 3: In the current proposal we don't have per operator configs. Target
>> utilization would apply to all operators uniformly.
>>
>> 4: It should be configurable, yes.
>>
>> 5,6: The names haven't been finalized but I think these are minor
>> details. We could add concrete names to the FLIP :)
>>
>> Sounds good.
>
>
>> Cheers,
>> Gyula
>>
>>
>> On Sun, Nov 6, 2022 at 5:19 PM Dong Lin  wrote:
>>
>>> Hi Max,
>>>
>>> Thank you for the proposal. The proposal tackles a very important issue
>>> for Flink users and the design looks promising overall!
>>>
>>> I have some questions to better understand the proposed public
>>> interfaces and the algorithm.
>>>
>>> 1) The proposal seems to assume that the operator's busyTimeMsPerSecond
>>> could reach 1 sec. I believe this is mostly true for cpu-bound operators.
>>> Could you confirm that this can also be true for io-bound operators such as
>>> sinks? For example, suppose a Kafka Sink subtask has reached I/O bottleneck
>>> when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
>>> reach 1 sec?
>>>
>>> 2) It is said that "users can configure a maximum time to fully process
>>> the backlog". The configuration section does not seem to provide this
>>> config. Could you specify this? And any chance this proposal can provide
>>> the formula for calculating the new processing rate?
>>>
>>> 3) How are users expected to specify the per-operator configs (e.g.
>>> target utilization)? For example, should users specify it programmatically
>>> in a DataStream/Table/SQL API?
>>>
>>> 4) How often will the Flink Kubernetes operator query metrics from
>>> JobManager? Is this configurable?
>>>
>>> 5) Could you specify the config name and default value for the proposed
>>> configs?
>>>
>>> 6) Could you add the name/mbean/type for the proposed metrics?
>>>
>>>
>>> Cheers,
>>> Dong
>>>
>>>
>>>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-07 Thread Dong Lin
Thanks for the explanation Gyula. Please see my reply inline.

BTW, has the proposed solution been deployed and evaluated with any
production workload? If yes, I am wondering if you could share the
experience, e.g. what is the likelihood of having regression and
improvement respectively after enabling this feature.


On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra  wrote:

> Hi Dong!
>
> Let me try to answer the questions :)
>
> 1 : busyTimeMsPerSecond is not specific for CPU, it measures the time
> spent in the main record processing loop for an operator if I
> understand correctly. This includes IO operations too.
>

I took a look at the StreamTask::processInput(...)
.
My understanding of this code is that when KafkaSink (same for other sinks)
can not write data out to the network fast enough, recordWriter

becomes unavailable and the StreamTask is considered to be
soft-back-pressured, instead of being considered busy.

If this is the case, then the algorithm currently proposed in the FLIP
might under-provision the sink operator's parallelism when the sink
operator is the bottleneck. I am not an expert with this piece of code
though. I am wondering if you or someone else could double-check this.


> 2: We should add this to the FLIP I agree. It would be a Duration config
> with the expected catch up time after rescaling (let's say 5 minutes). It
> could be computed based on the current data rate and the calculated max
> processing rate after the rescale.
>
> Great. I am looking forward to the formula :)


> 3: In the current proposal we don't have per operator configs. Target
> utilization would apply to all operators uniformly.
>
> 4: It should be configurable, yes.
>
> 5,6: The names haven't been finalized but I think these are minor details.
> We could add concrete names to the FLIP :)
>
> Sounds good.


> Cheers,
> Gyula
>
>
> On Sun, Nov 6, 2022 at 5:19 PM Dong Lin  wrote:
>
>> Hi Max,
>>
>> Thank you for the proposal. The proposal tackles a very important issue
>> for Flink users and the design looks promising overall!
>>
>> I have some questions to better understand the proposed public interfaces
>> and the algorithm.
>>
>> 1) The proposal seems to assume that the operator's busyTimeMsPerSecond
>> could reach 1 sec. I believe this is mostly true for cpu-bound operators.
>> Could you confirm that this can also be true for io-bound operators such as
>> sinks? For example, suppose a Kafka Sink subtask has reached I/O bottleneck
>> when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
>> reach 1 sec?
>>
>> 2) It is said that "users can configure a maximum time to fully process
>> the backlog". The configuration section does not seem to provide this
>> config. Could you specify this? And any chance this proposal can provide
>> the formula for calculating the new processing rate?
>>
>> 3) How are users expected to specify the per-operator configs (e.g.
>> target utilization)? For example, should users specify it programmatically
>> in a DataStream/Table/SQL API?
>>
>> 4) How often will the Flink Kubernetes operator query metrics from
>> JobManager? Is this configurable?
>>
>> 5) Could you specify the config name and default value for the proposed
>> configs?
>>
>> 6) Could you add the name/mbean/type for the proposed metrics?
>>
>>
>> Cheers,
>> Dong
>>
>>
>>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-07 Thread JunRui Lee
@Guyla,

Thanks for the explanation and the follow up actions. That sounds good to
me.

Thanks,
JunRui Lee

Yanfei Lei  于2022年11月7日周一 12:20写道:

> Hi Max,
>
> Thanks for the proposal. This proposal makes Flink better adapted to
> cloud-native applications!
>
> After reading the FLIP, I'm curious about some points:
>
> 1) It's said that "The first step is collecting metrics for all JobVertices
> by combining metrics from all the runtime subtasks and computing the
> *average*". When the load of the subtasks of an operator is not balanced,
> do we need to trigger autoScaling? Has the median or some percentiles been
> considered?
> 2) IIUC, "FLIP-159: Reactive Mode" is somewhat similar to this proposal,
> will we reuse some logic from Reactive Mode?
>
> Best,
> Yanfei
>
> Gyula Fóra  于2022年11月7日周一 02:33写道:
>
> > Hi Dong!
> >
> > Let me try to answer the questions :)
> >
> > 1 : busyTimeMsPerSecond is not specific for CPU, it measures the time
> spent
> > in the main record processing loop for an operator if I
> > understand correctly. This includes IO operations too.
> >
> > 2: We should add this to the FLIP I agree. It would be a Duration config
> > with the expected catch up time after rescaling (let's say 5 minutes). It
> > could be computed based on the current data rate and the calculated max
> > processing rate after the rescale.
> >
> > 3: In the current proposal we don't have per operator configs. Target
> > utilization would apply to all operators uniformly.
> >
> > 4: It should be configurable, yes.
> >
> > 5,6: The names haven't been finalized but I think these are minor
> details.
> > We could add concrete names to the FLIP :)
> >
> > Cheers,
> > Gyula
> >
> >
> > On Sun, Nov 6, 2022 at 5:19 PM Dong Lin  wrote:
> >
> > > Hi Max,
> > >
> > > Thank you for the proposal. The proposal tackles a very important issue
> > > for Flink users and the design looks promising overall!
> > >
> > > I have some questions to better understand the proposed public
> interfaces
> > > and the algorithm.
> > >
> > > 1) The proposal seems to assume that the operator's busyTimeMsPerSecond
> > > could reach 1 sec. I believe this is mostly true for cpu-bound
> operators.
> > > Could you confirm that this can also be true for io-bound operators
> such
> > as
> > > sinks? For example, suppose a Kafka Sink subtask has reached I/O
> > bottleneck
> > > when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
> > > reach 1 sec?
> > >
> > > 2) It is said that "users can configure a maximum time to fully process
> > > the backlog". The configuration section does not seem to provide this
> > > config. Could you specify this? And any chance this proposal can
> provide
> > > the formula for calculating the new processing rate?
> > >
> > > 3) How are users expected to specify the per-operator configs (e.g.
> > target
> > > utilization)? For example, should users specify it programmatically in
> a
> > > DataStream/Table/SQL API?
> > >
> > > 4) How often will the Flink Kubernetes operator query metrics from
> > > JobManager? Is this configurable?
> > >
> > > 5) Could you specify the config name and default value for the proposed
> > > configs?
> > >
> > > 6) Could you add the name/mbean/type for the proposed metrics?
> > >
> > >
> > > Cheers,
> > > Dong
> > >
> > >
> > >
> >
>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-06 Thread Yanfei Lei
Hi Max,

Thanks for the proposal. This proposal makes Flink better adapted to
cloud-native applications!

After reading the FLIP, I'm curious about some points:

1) It's said that "The first step is collecting metrics for all JobVertices
by combining metrics from all the runtime subtasks and computing the
*average*". When the load of the subtasks of an operator is not balanced,
do we need to trigger autoScaling? Has the median or some percentiles been
considered?
2) IIUC, "FLIP-159: Reactive Mode" is somewhat similar to this proposal,
will we reuse some logic from Reactive Mode?

Best,
Yanfei

Gyula Fóra  于2022年11月7日周一 02:33写道:

> Hi Dong!
>
> Let me try to answer the questions :)
>
> 1 : busyTimeMsPerSecond is not specific for CPU, it measures the time spent
> in the main record processing loop for an operator if I
> understand correctly. This includes IO operations too.
>
> 2: We should add this to the FLIP I agree. It would be a Duration config
> with the expected catch up time after rescaling (let's say 5 minutes). It
> could be computed based on the current data rate and the calculated max
> processing rate after the rescale.
>
> 3: In the current proposal we don't have per operator configs. Target
> utilization would apply to all operators uniformly.
>
> 4: It should be configurable, yes.
>
> 5,6: The names haven't been finalized but I think these are minor details.
> We could add concrete names to the FLIP :)
>
> Cheers,
> Gyula
>
>
> On Sun, Nov 6, 2022 at 5:19 PM Dong Lin  wrote:
>
> > Hi Max,
> >
> > Thank you for the proposal. The proposal tackles a very important issue
> > for Flink users and the design looks promising overall!
> >
> > I have some questions to better understand the proposed public interfaces
> > and the algorithm.
> >
> > 1) The proposal seems to assume that the operator's busyTimeMsPerSecond
> > could reach 1 sec. I believe this is mostly true for cpu-bound operators.
> > Could you confirm that this can also be true for io-bound operators such
> as
> > sinks? For example, suppose a Kafka Sink subtask has reached I/O
> bottleneck
> > when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
> > reach 1 sec?
> >
> > 2) It is said that "users can configure a maximum time to fully process
> > the backlog". The configuration section does not seem to provide this
> > config. Could you specify this? And any chance this proposal can provide
> > the formula for calculating the new processing rate?
> >
> > 3) How are users expected to specify the per-operator configs (e.g.
> target
> > utilization)? For example, should users specify it programmatically in a
> > DataStream/Table/SQL API?
> >
> > 4) How often will the Flink Kubernetes operator query metrics from
> > JobManager? Is this configurable?
> >
> > 5) Could you specify the config name and default value for the proposed
> > configs?
> >
> > 6) Could you add the name/mbean/type for the proposed metrics?
> >
> >
> > Cheers,
> > Dong
> >
> >
> >
>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-06 Thread Gyula Fóra
Hi Dong!

Let me try to answer the questions :)

1 : busyTimeMsPerSecond is not specific for CPU, it measures the time spent
in the main record processing loop for an operator if I
understand correctly. This includes IO operations too.

2: We should add this to the FLIP I agree. It would be a Duration config
with the expected catch up time after rescaling (let's say 5 minutes). It
could be computed based on the current data rate and the calculated max
processing rate after the rescale.

3: In the current proposal we don't have per operator configs. Target
utilization would apply to all operators uniformly.

4: It should be configurable, yes.

5,6: The names haven't been finalized but I think these are minor details.
We could add concrete names to the FLIP :)

Cheers,
Gyula


On Sun, Nov 6, 2022 at 5:19 PM Dong Lin  wrote:

> Hi Max,
>
> Thank you for the proposal. The proposal tackles a very important issue
> for Flink users and the design looks promising overall!
>
> I have some questions to better understand the proposed public interfaces
> and the algorithm.
>
> 1) The proposal seems to assume that the operator's busyTimeMsPerSecond
> could reach 1 sec. I believe this is mostly true for cpu-bound operators.
> Could you confirm that this can also be true for io-bound operators such as
> sinks? For example, suppose a Kafka Sink subtask has reached I/O bottleneck
> when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
> reach 1 sec?
>
> 2) It is said that "users can configure a maximum time to fully process
> the backlog". The configuration section does not seem to provide this
> config. Could you specify this? And any chance this proposal can provide
> the formula for calculating the new processing rate?
>
> 3) How are users expected to specify the per-operator configs (e.g. target
> utilization)? For example, should users specify it programmatically in a
> DataStream/Table/SQL API?
>
> 4) How often will the Flink Kubernetes operator query metrics from
> JobManager? Is this configurable?
>
> 5) Could you specify the config name and default value for the proposed
> configs?
>
> 6) Could you add the name/mbean/type for the proposed metrics?
>
>
> Cheers,
> Dong
>
>
>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-06 Thread Dong Lin
Hi Max,

Thank you for the proposal. The proposal tackles a very important issue for
Flink users and the design looks promising overall!

I have some questions to better understand the proposed public interfaces
and the algorithm.

1) The proposal seems to assume that the operator's busyTimeMsPerSecond
could reach 1 sec. I believe this is mostly true for cpu-bound operators.
Could you confirm that this can also be true for io-bound operators such as
sinks? For example, suppose a Kafka Sink subtask has reached I/O bottleneck
when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
reach 1 sec?

2) It is said that "users can configure a maximum time to fully process the
backlog". The configuration section does not seem to provide this config.
Could you specify this? And any chance this proposal can provide the
formula for calculating the new processing rate?

3) How are users expected to specify the per-operator configs (e.g. target
utilization)? For example, should users specify it programmatically in a
DataStream/Table/SQL API?

4) How often will the Flink Kubernetes operator query metrics from
JobManager? Is this configurable?

5) Could you specify the config name and default value for the proposed
configs?

6) Could you add the name/mbean/type for the proposed metrics?


Cheers,
Dong


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-06 Thread Gyula Fóra
@Pedro:
The current design focuses on record processing time metrics. In most cases
when we need to scale (such as too much state per operator), record
processing time actually slows, so it would detect that. Of course in the
future we can add new logic if we see something missing.

@ConradJam:
We agree that the on/off config for the actual scaling is very important.
The autoscaler module would then publish all metrics and parallelism advice
without actually taking action. We should definitely have this.
Thanks for the pointer to the talk, we have considered that approach but
it's mostly suitable for very simple pipelines. We hope that our approach
would generalize better to complex applications and would also cover the
simple case in a similar way.

Gyula

On Sun, Nov 6, 2022 at 7:25 AM Zheng Yu Chen  wrote:

> Hi Max
> Thank you for dirver this flip,I have some advice for this flip
>
> Do we not only exist in the (on/off) switch, but also have one more option
> for (advcie).
> After the user opens (advcie), it does not actually perform AutoScaling. It
> only outputs the notification form of tuning suggestions for the user's
> reference. It is up to the user to decide whether to trigger the adjustment
> of the parallelism.I believe that this function is very useful in the
> debugging phase or the observation phase. When the user observes a certain
> period of time, he thinks it is feasible and then turns on the switch.
>
> at the same time, I found that FFA 2020 Netflix has a related topic
> discussing the automatic tuning function
> Attach the video address: Autoscaling Flink at Netflix - Timothy Farkas
>  This may be helpful for us
> to
> complete this function
>
> Here is a description of using some prediction functions to predict the
> operator traffic of this job. Can we provide some interfaces for users to
> customize and implement some tuning algorithms?
>
>
> Maximilian Michels  于2022年11月5日周六 02:37写道:
>
> > Hi,
> >
> > I would like to kick off the discussion on implementing autoscaling for
> > Flink as part of the Flink Kubernetes operator. I've outlined an approach
> > here which I find promising:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
> >
> > I've been discussing this approach with some of the operator
> contributors:
> > Gyula, Marton, Matyas, and Thomas (all in CC). We started prototyping an
> > implementation based on the current FLIP design. If that goes well, we
> > would like to contribute this to Flink based on the results of the
> > discussion here.
> >
> > I'm curious to hear your thoughts.
> >
> > -Max
> >
>
>
> --
> Best
>
> ConradJam
>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-06 Thread Zheng Yu Chen
Hi Max
Thank you for dirver this flip,I have some advice for this flip

Do we not only exist in the (on/off) switch, but also have one more option
for (advcie).
After the user opens (advcie), it does not actually perform AutoScaling. It
only outputs the notification form of tuning suggestions for the user's
reference. It is up to the user to decide whether to trigger the adjustment
of the parallelism.I believe that this function is very useful in the
debugging phase or the observation phase. When the user observes a certain
period of time, he thinks it is feasible and then turns on the switch.

at the same time, I found that FFA 2020 Netflix has a related topic
discussing the automatic tuning function
Attach the video address: Autoscaling Flink at Netflix - Timothy Farkas
 This may be helpful for us to
complete this function

Here is a description of using some prediction functions to predict the
operator traffic of this job. Can we provide some interfaces for users to
customize and implement some tuning algorithms?


Maximilian Michels  于2022年11月5日周六 02:37写道:

> Hi,
>
> I would like to kick off the discussion on implementing autoscaling for
> Flink as part of the Flink Kubernetes operator. I've outlined an approach
> here which I find promising:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
>
> I've been discussing this approach with some of the operator contributors:
> Gyula, Marton, Matyas, and Thomas (all in CC). We started prototyping an
> implementation based on the current FLIP design. If that goes well, we
> would like to contribute this to Flink based on the results of the
> discussion here.
>
> I'm curious to hear your thoughts.
>
> -Max
>


-- 
Best

ConradJam


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-05 Thread Pedro Silva
@Guyla,

Thank you for your reply, the answer makes perfect sense. I have a follow up if 
that’s ok. 

IIUC this FLIP uses metrics that relate to backpressure at an operator level 
(records in vs out, busy time etc…).
Could the FLIP also be used to auto-scale based on state-level metrics at an 
operator level? 

I.e: keyed state growing too large for a given operator and needing to be 
distributed across more instances of that operator.

Cheers,
Pedro

> 
> On 5 Nov 2022, at 09:47, Gyula Fóra  wrote:
> 
> @JunRui:
> There are 2 pieces that prevent scaling on minor load variations. Firstly
> he algorithm / logic is intended to work on metrics averaged on a
> configured time window (let's say last 5 minutes), this smoothens minor
> variances and results in more stability. Secondly, in addition to the
> utilization target (let's say 75%) there would be a configurable flexible
> bound (for example +/- 10%) and as long as we are within the bound we
> wouldn't trigger scaling. These 2 together should be enough we believe.
> 
> @Pedro:
> Periodic metric collection is very important to get a good overview of the
> system at any given time. Also we need some recent history to make a good
> scaling decision. Reflecting on your suggestion, when the input lag
> increases we are already too late to scale. Ideally the algorithm would
> pick up on a gradual load increase before it even results in actual input
> lag so we can always keep our target utilization level.
> 
> Cheers,
> Gyula
> 
> 
> 
>> On Sat, Nov 5, 2022 at 5:16 PM Pedro Silva  wrote:
>> 
>> Hello,
>> 
>> First of all thank you for tackling this theme, it is massive boon to
>> Flink if it gets in.
>> 
>> Following up on JunRui Lee’s question.
>> Have you considered making metrics collection getting triggered based on
>> events rather than periodic checks?
>> 
>> I.e if input source lag is increasing for the past x amount of time ->
>> trigger a metric collection to understand what to scale, if anything.
>> 
>> For kubernetes loads there is KEDA that does this:
>> https://keda.sh/docs/2.8/scalers/prometheus/
>> 
>> My apologies if the question doesn’t make sense.
>> 
>> Thank you for your time,
>> Pedro Silva
>> 
>>> 
 On 5 Nov 2022, at 08:09, JunRui Lee  wrote:
>>> 
>>> Hi Max,
>>> 
>>> Thanks for writing this FLIP and initiating the discussion.
>>> 
>>> I just have a small question after reading the FLIP:
>>> 
>>> In the document, I didn't find the definition of when to trigger
>>> autoScaling after some jobVertex reach the threshold. If I missed is,
>>> please let me know.
>>> IIUC, the proper triggering rules are necessary to avoid unnecessary
>>> autoscaling caused by temporary large changes in data,
>>> and in this case, it will lead to at least two meaningless resubmissions
>> of
>>> jobs, which will negatively affect users.
>>> 
>>> Thanks,
>>> JunRui Lee
>>> 
>>> Gyula Fóra  于2022年11月5日周六 20:38写道:
>>> 
 Hey!
 Thanks for the input!
 The algorithm does not really differentiate between scaling up or down
>> as
 it’s concerned about finding the right parallelism to match the target
 processing rate with just enough spare capacity.
 Let me try to address your specific points:
 1. The backlog growth rate only matters for computing the target
>> processing
 rate for the sources. If the parallelism is high enough and there is no
 back pressure it will be close to 0 so the target rate is the source
>> read
 rate. This is as intended. If we see that the sources are not busy and
>> they
 can read more than enough the algorithm would scale them down.
 2. You are right , it’s dangerous to scale in too much, so we already
 thought about limiting the scale down amount per scaling step/time
>> window
 to give more safety. But we can definitely think about different
>> strategies
 in the future!
 The observation regarding max parallelism is very important and we
>> should
 always take that into consideration.
 Cheers
 Gyula
> On Sat, 5 Nov 2022 at 11:46, Biao Geng  wrote:
> Hi Max,
> Thanks a lot for the FLIP. It is an extremely attractive feature!
> Just some follow up questions/thoughts after reading the FLIP:
> In the doc, the discussion of  the strategy of “scaling out” is
>> thorough
> and convincing to me but it seems that “scaling down” is less
>> discussed.
 I
> have 2 cents for this aspect:
> 1.  For source parallelisms, if the user configure a much larger value
> than normal, there should be very little pending records though it is
> possible to get optimized. But IIUC, in current algorithm, we will not
 take
> actions for this case as the backlog growth rate is almost zero. Is the
> understanding right?
> 2.  Compared with “scaling out”, “scaling in” is usually more dangerous
> as it is more likely to lead to negative influence to the downstream
 jobs.
> The min/max load bounds should be useful. I am 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-05 Thread Gyula Fóra
@JunRui:
There are 2 pieces that prevent scaling on minor load variations. Firstly
he algorithm / logic is intended to work on metrics averaged on a
configured time window (let's say last 5 minutes), this smoothens minor
variances and results in more stability. Secondly, in addition to the
utilization target (let's say 75%) there would be a configurable flexible
bound (for example +/- 10%) and as long as we are within the bound we
wouldn't trigger scaling. These 2 together should be enough we believe.

@Pedro:
Periodic metric collection is very important to get a good overview of the
system at any given time. Also we need some recent history to make a good
scaling decision. Reflecting on your suggestion, when the input lag
increases we are already too late to scale. Ideally the algorithm would
pick up on a gradual load increase before it even results in actual input
lag so we can always keep our target utilization level.

Cheers,
Gyula



On Sat, Nov 5, 2022 at 5:16 PM Pedro Silva  wrote:

> Hello,
>
> First of all thank you for tackling this theme, it is massive boon to
> Flink if it gets in.
>
> Following up on JunRui Lee’s question.
> Have you considered making metrics collection getting triggered based on
> events rather than periodic checks?
>
> I.e if input source lag is increasing for the past x amount of time ->
> trigger a metric collection to understand what to scale, if anything.
>
> For kubernetes loads there is KEDA that does this:
> https://keda.sh/docs/2.8/scalers/prometheus/
>
> My apologies if the question doesn’t make sense.
>
> Thank you for your time,
> Pedro Silva
>
> >
> > On 5 Nov 2022, at 08:09, JunRui Lee  wrote:
> >
> > Hi Max,
> >
> > Thanks for writing this FLIP and initiating the discussion.
> >
> > I just have a small question after reading the FLIP:
> >
> > In the document, I didn't find the definition of when to trigger
> > autoScaling after some jobVertex reach the threshold. If I missed is,
> > please let me know.
> > IIUC, the proper triggering rules are necessary to avoid unnecessary
> > autoscaling caused by temporary large changes in data,
> > and in this case, it will lead to at least two meaningless resubmissions
> of
> > jobs, which will negatively affect users.
> >
> > Thanks,
> > JunRui Lee
> >
> > Gyula Fóra  于2022年11月5日周六 20:38写道:
> >
> >> Hey!
> >> Thanks for the input!
> >> The algorithm does not really differentiate between scaling up or down
> as
> >> it’s concerned about finding the right parallelism to match the target
> >> processing rate with just enough spare capacity.
> >> Let me try to address your specific points:
> >> 1. The backlog growth rate only matters for computing the target
> processing
> >> rate for the sources. If the parallelism is high enough and there is no
> >> back pressure it will be close to 0 so the target rate is the source
> read
> >> rate. This is as intended. If we see that the sources are not busy and
> they
> >> can read more than enough the algorithm would scale them down.
> >> 2. You are right , it’s dangerous to scale in too much, so we already
> >> thought about limiting the scale down amount per scaling step/time
> window
> >> to give more safety. But we can definitely think about different
> strategies
> >> in the future!
> >> The observation regarding max parallelism is very important and we
> should
> >> always take that into consideration.
> >> Cheers
> >> Gyula
> >>> On Sat, 5 Nov 2022 at 11:46, Biao Geng  wrote:
> >>> Hi Max,
> >>> Thanks a lot for the FLIP. It is an extremely attractive feature!
> >>> Just some follow up questions/thoughts after reading the FLIP:
> >>> In the doc, the discussion of  the strategy of “scaling out” is
> thorough
> >>> and convincing to me but it seems that “scaling down” is less
> discussed.
> >> I
> >>> have 2 cents for this aspect:
> >>> 1.  For source parallelisms, if the user configure a much larger value
> >>> than normal, there should be very little pending records though it is
> >>> possible to get optimized. But IIUC, in current algorithm, we will not
> >> take
> >>> actions for this case as the backlog growth rate is almost zero. Is the
> >>> understanding right?
> >>> 2.  Compared with “scaling out”, “scaling in” is usually more dangerous
> >>> as it is more likely to lead to negative influence to the downstream
> >> jobs.
> >>> The min/max load bounds should be useful. I am wondering if it is
> >> possible
> >>> to have different strategy for “scaling in” to make it more
> conservative.
> >>> Or more eagerly, allow custom autoscaling strategy(e.g. time-based
> >>> strategy).
> >>> Another side thought is that to recover a job from
> checkpoint/savepoint,
> >>> the new parallelism cannot be larger than max parallelism defined in
> the
> >>> checkpoint(see this<
> >>
> https://github.com/apache/flink/blob/17a782c202c93343b8884cb52f4562f9c4ba593f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L128
> >>> ).
> >>> Not sure if this limit 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-05 Thread Pedro Silva
Hello,

First of all thank you for tackling this theme, it is massive boon to Flink if 
it gets in.

Following up on JunRui Lee’s question. 
Have you considered making metrics collection getting triggered based on events 
rather than periodic checks?

I.e if input source lag is increasing for the past x amount of time -> trigger 
a metric collection to understand what to scale, if anything.

For kubernetes loads there is KEDA that does this: 
https://keda.sh/docs/2.8/scalers/prometheus/

My apologies if the question doesn’t make sense.

Thank you for your time,
Pedro Silva

> 
> On 5 Nov 2022, at 08:09, JunRui Lee  wrote:
> 
> Hi Max,
> 
> Thanks for writing this FLIP and initiating the discussion.
> 
> I just have a small question after reading the FLIP:
> 
> In the document, I didn't find the definition of when to trigger
> autoScaling after some jobVertex reach the threshold. If I missed is,
> please let me know.
> IIUC, the proper triggering rules are necessary to avoid unnecessary
> autoscaling caused by temporary large changes in data,
> and in this case, it will lead to at least two meaningless resubmissions of
> jobs, which will negatively affect users.
> 
> Thanks,
> JunRui Lee
> 
> Gyula Fóra  于2022年11月5日周六 20:38写道:
> 
>> Hey!
>> Thanks for the input!
>> The algorithm does not really differentiate between scaling up or down as
>> it’s concerned about finding the right parallelism to match the target
>> processing rate with just enough spare capacity.
>> Let me try to address your specific points:
>> 1. The backlog growth rate only matters for computing the target processing
>> rate for the sources. If the parallelism is high enough and there is no
>> back pressure it will be close to 0 so the target rate is the source read
>> rate. This is as intended. If we see that the sources are not busy and they
>> can read more than enough the algorithm would scale them down.
>> 2. You are right , it’s dangerous to scale in too much, so we already
>> thought about limiting the scale down amount per scaling step/time window
>> to give more safety. But we can definitely think about different strategies
>> in the future!
>> The observation regarding max parallelism is very important and we should
>> always take that into consideration.
>> Cheers
>> Gyula
>>> On Sat, 5 Nov 2022 at 11:46, Biao Geng  wrote:
>>> Hi Max,
>>> Thanks a lot for the FLIP. It is an extremely attractive feature!
>>> Just some follow up questions/thoughts after reading the FLIP:
>>> In the doc, the discussion of  the strategy of “scaling out” is thorough
>>> and convincing to me but it seems that “scaling down” is less discussed.
>> I
>>> have 2 cents for this aspect:
>>> 1.  For source parallelisms, if the user configure a much larger value
>>> than normal, there should be very little pending records though it is
>>> possible to get optimized. But IIUC, in current algorithm, we will not
>> take
>>> actions for this case as the backlog growth rate is almost zero. Is the
>>> understanding right?
>>> 2.  Compared with “scaling out”, “scaling in” is usually more dangerous
>>> as it is more likely to lead to negative influence to the downstream
>> jobs.
>>> The min/max load bounds should be useful. I am wondering if it is
>> possible
>>> to have different strategy for “scaling in” to make it more conservative.
>>> Or more eagerly, allow custom autoscaling strategy(e.g. time-based
>>> strategy).
>>> Another side thought is that to recover a job from checkpoint/savepoint,
>>> the new parallelism cannot be larger than max parallelism defined in the
>>> checkpoint(see this<
>> https://github.com/apache/flink/blob/17a782c202c93343b8884cb52f4562f9c4ba593f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L128
>>> ).
>>> Not sure if this limit should be mentioned in the FLIP.
>>> Again, thanks for the great work and looking forward to using flink k8s
>>> operator with it!
>>> Best,
>>> Biao Geng
>>> From: Maximilian Michels 
>>> Date: Saturday, November 5, 2022 at 2:37 AM
>>> To: dev 
>>> Cc: Gyula Fóra , Thomas Weise ,
>>> Marton Balassi , Őrhidi Mátyás <
>>> matyas.orh...@gmail.com>
>>> Subject: [DISCUSS] FLIP-271: Autoscaling
>>> Hi,
>>> I would like to kick off the discussion on implementing autoscaling for
>>> Flink as part of the Flink Kubernetes operator. I've outlined an approach
>>> here which I find promising:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
>>> I've been discussing this approach with some of the operator
>> contributors:
>>> Gyula, Marton, Matyas, and Thomas (all in CC). We started prototyping an
>>> implementation based on the current FLIP design. If that goes well, we
>>> would like to contribute this to Flink based on the results of the
>>> discussion here.
>>> I'm curious to hear your thoughts.
>>> -Max


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-05 Thread JunRui Lee
Hi Max,

Thanks for writing this FLIP and initiating the discussion.

I just have a small question after reading the FLIP:

In the document, I didn't find the definition of when to trigger
autoScaling after some jobVertex reach the threshold. If I missed is,
please let me know.
IIUC, the proper triggering rules are necessary to avoid unnecessary
autoscaling caused by temporary large changes in data,
and in this case, it will lead to at least two meaningless resubmissions of
jobs, which will negatively affect users.

Thanks,
JunRui Lee

Gyula Fóra  于2022年11月5日周六 20:38写道:

> Hey!
>
> Thanks for the input!
>
> The algorithm does not really differentiate between scaling up or down as
> it’s concerned about finding the right parallelism to match the target
> processing rate with just enough spare capacity.
>
> Let me try to address your specific points:
>
> 1. The backlog growth rate only matters for computing the target processing
> rate for the sources. If the parallelism is high enough and there is no
> back pressure it will be close to 0 so the target rate is the source read
> rate. This is as intended. If we see that the sources are not busy and they
> can read more than enough the algorithm would scale them down.
>
> 2. You are right , it’s dangerous to scale in too much, so we already
> thought about limiting the scale down amount per scaling step/time window
> to give more safety. But we can definitely think about different strategies
> in the future!
>
> The observation regarding max parallelism is very important and we should
> always take that into consideration.
>
> Cheers
> Gyula
>
> On Sat, 5 Nov 2022 at 11:46, Biao Geng  wrote:
>
> > Hi Max,
> >
> > Thanks a lot for the FLIP. It is an extremely attractive feature!
> >
> > Just some follow up questions/thoughts after reading the FLIP:
> > In the doc, the discussion of  the strategy of “scaling out” is thorough
> > and convincing to me but it seems that “scaling down” is less discussed.
> I
> > have 2 cents for this aspect:
> >
> >   1.  For source parallelisms, if the user configure a much larger value
> > than normal, there should be very little pending records though it is
> > possible to get optimized. But IIUC, in current algorithm, we will not
> take
> > actions for this case as the backlog growth rate is almost zero. Is the
> > understanding right?
> >   2.  Compared with “scaling out”, “scaling in” is usually more dangerous
> > as it is more likely to lead to negative influence to the downstream
> jobs.
> > The min/max load bounds should be useful. I am wondering if it is
> possible
> > to have different strategy for “scaling in” to make it more conservative.
> > Or more eagerly, allow custom autoscaling strategy(e.g. time-based
> > strategy).
> >
> > Another side thought is that to recover a job from checkpoint/savepoint,
> > the new parallelism cannot be larger than max parallelism defined in the
> > checkpoint(see this<
> >
> https://github.com/apache/flink/blob/17a782c202c93343b8884cb52f4562f9c4ba593f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L128
> >).
> > Not sure if this limit should be mentioned in the FLIP.
> >
> > Again, thanks for the great work and looking forward to using flink k8s
> > operator with it!
> >
> > Best,
> > Biao Geng
> >
> > From: Maximilian Michels 
> > Date: Saturday, November 5, 2022 at 2:37 AM
> > To: dev 
> > Cc: Gyula Fóra , Thomas Weise ,
> > Marton Balassi , Őrhidi Mátyás <
> > matyas.orh...@gmail.com>
> > Subject: [DISCUSS] FLIP-271: Autoscaling
> > Hi,
> >
> > I would like to kick off the discussion on implementing autoscaling for
> > Flink as part of the Flink Kubernetes operator. I've outlined an approach
> > here which I find promising:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
> >
> > I've been discussing this approach with some of the operator
> contributors:
> > Gyula, Marton, Matyas, and Thomas (all in CC). We started prototyping an
> > implementation based on the current FLIP design. If that goes well, we
> > would like to contribute this to Flink based on the results of the
> > discussion here.
> >
> > I'm curious to hear your thoughts.
> >
> > -Max
> >
>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-05 Thread Gyula Fóra
Hey!

Thanks for the input!

The algorithm does not really differentiate between scaling up or down as
it’s concerned about finding the right parallelism to match the target
processing rate with just enough spare capacity.

Let me try to address your specific points:

1. The backlog growth rate only matters for computing the target processing
rate for the sources. If the parallelism is high enough and there is no
back pressure it will be close to 0 so the target rate is the source read
rate. This is as intended. If we see that the sources are not busy and they
can read more than enough the algorithm would scale them down.

2. You are right , it’s dangerous to scale in too much, so we already
thought about limiting the scale down amount per scaling step/time window
to give more safety. But we can definitely think about different strategies
in the future!

The observation regarding max parallelism is very important and we should
always take that into consideration.

Cheers
Gyula

On Sat, 5 Nov 2022 at 11:46, Biao Geng  wrote:

> Hi Max,
>
> Thanks a lot for the FLIP. It is an extremely attractive feature!
>
> Just some follow up questions/thoughts after reading the FLIP:
> In the doc, the discussion of  the strategy of “scaling out” is thorough
> and convincing to me but it seems that “scaling down” is less discussed. I
> have 2 cents for this aspect:
>
>   1.  For source parallelisms, if the user configure a much larger value
> than normal, there should be very little pending records though it is
> possible to get optimized. But IIUC, in current algorithm, we will not take
> actions for this case as the backlog growth rate is almost zero. Is the
> understanding right?
>   2.  Compared with “scaling out”, “scaling in” is usually more dangerous
> as it is more likely to lead to negative influence to the downstream jobs.
> The min/max load bounds should be useful. I am wondering if it is possible
> to have different strategy for “scaling in” to make it more conservative.
> Or more eagerly, allow custom autoscaling strategy(e.g. time-based
> strategy).
>
> Another side thought is that to recover a job from checkpoint/savepoint,
> the new parallelism cannot be larger than max parallelism defined in the
> checkpoint(see this<
> https://github.com/apache/flink/blob/17a782c202c93343b8884cb52f4562f9c4ba593f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L128>).
> Not sure if this limit should be mentioned in the FLIP.
>
> Again, thanks for the great work and looking forward to using flink k8s
> operator with it!
>
> Best,
> Biao Geng
>
> From: Maximilian Michels 
> Date: Saturday, November 5, 2022 at 2:37 AM
> To: dev 
> Cc: Gyula Fóra , Thomas Weise ,
> Marton Balassi , Őrhidi Mátyás <
> matyas.orh...@gmail.com>
> Subject: [DISCUSS] FLIP-271: Autoscaling
> Hi,
>
> I would like to kick off the discussion on implementing autoscaling for
> Flink as part of the Flink Kubernetes operator. I've outlined an approach
> here which I find promising:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
>
> I've been discussing this approach with some of the operator contributors:
> Gyula, Marton, Matyas, and Thomas (all in CC). We started prototyping an
> implementation based on the current FLIP design. If that goes well, we
> would like to contribute this to Flink based on the results of the
> discussion here.
>
> I'm curious to hear your thoughts.
>
> -Max
>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-05 Thread Biao Geng
Hi Max,

Thanks a lot for the FLIP. It is an extremely attractive feature!

Just some follow up questions/thoughts after reading the FLIP:
In the doc, the discussion of  the strategy of “scaling out” is thorough and 
convincing to me but it seems that “scaling down” is less discussed. I have 2 
cents for this aspect:

  1.  For source parallelisms, if the user configure a much larger value than 
normal, there should be very little pending records though it is possible to 
get optimized. But IIUC, in current algorithm, we will not take actions for 
this case as the backlog growth rate is almost zero. Is the understanding right?
  2.  Compared with “scaling out”, “scaling in” is usually more dangerous as it 
is more likely to lead to negative influence to the downstream jobs. The 
min/max load bounds should be useful. I am wondering if it is possible to have 
different strategy for “scaling in” to make it more conservative. Or more 
eagerly, allow custom autoscaling strategy(e.g. time-based strategy).

Another side thought is that to recover a job from checkpoint/savepoint, the 
new parallelism cannot be larger than max parallelism defined in the 
checkpoint(see 
this).
 Not sure if this limit should be mentioned in the FLIP.

Again, thanks for the great work and looking forward to using flink k8s 
operator with it!

Best,
Biao Geng

From: Maximilian Michels 
Date: Saturday, November 5, 2022 at 2:37 AM
To: dev 
Cc: Gyula Fóra , Thomas Weise , Marton 
Balassi , Őrhidi Mátyás 
Subject: [DISCUSS] FLIP-271: Autoscaling
Hi,

I would like to kick off the discussion on implementing autoscaling for
Flink as part of the Flink Kubernetes operator. I've outlined an approach
here which I find promising:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling

I've been discussing this approach with some of the operator contributors:
Gyula, Marton, Matyas, and Thomas (all in CC). We started prototyping an
implementation based on the current FLIP design. If that goes well, we
would like to contribute this to Flink based on the results of the
discussion here.

I'm curious to hear your thoughts.

-Max


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-05 Thread Márton Balassi
Thanks for preparing the FLIP and kicking off the discussion, Max. Looking
forward to this. :-)

On Sat, Nov 5, 2022 at 9:27 AM Niels Basjes  wrote:

> I'm really looking forward to seeing this in action.
>
> Niels
>
> On Fri, 4 Nov 2022, 19:37 Maximilian Michels,  wrote:
>
>> Hi,
>>
>> I would like to kick off the discussion on implementing autoscaling for
>> Flink as part of the Flink Kubernetes operator. I've outlined an approach
>> here which I find promising:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
>>
>> I've been discussing this approach with some of the operator contributors:
>> Gyula, Marton, Matyas, and Thomas (all in CC). We started prototyping an
>> implementation based on the current FLIP design. If that goes well, we
>> would like to contribute this to Flink based on the results of the
>> discussion here.
>>
>> I'm curious to hear your thoughts.
>>
>> -Max
>>
>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-05 Thread Niels Basjes
I'm really looking forward to seeing this in action.

Niels

On Fri, 4 Nov 2022, 19:37 Maximilian Michels,  wrote:

> Hi,
>
> I would like to kick off the discussion on implementing autoscaling for
> Flink as part of the Flink Kubernetes operator. I've outlined an approach
> here which I find promising:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
>
> I've been discussing this approach with some of the operator contributors:
> Gyula, Marton, Matyas, and Thomas (all in CC). We started prototyping an
> implementation based on the current FLIP design. If that goes well, we
> would like to contribute this to Flink based on the results of the
> discussion here.
>
> I'm curious to hear your thoughts.
>
> -Max
>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-04 Thread Őrhidi Mátyás
Thank you Max, Gyula!

This is definitely an exciting one :)

Cheers,
Matyas

On Fri, Nov 4, 2022 at 1:16 PM Gyula Fóra  wrote:

> Hi!
>
> Thank you for the proposal Max! It is great to see this highly desired
> feature finally take shape.
>
> I think we have all the right building blocks to make this successful.
>
> Cheers,
> Gyula
>
> On Fri, Nov 4, 2022 at 7:37 PM Maximilian Michels  wrote:
>
>> Hi,
>>
>> I would like to kick off the discussion on implementing autoscaling for
>> Flink as part of the Flink Kubernetes operator. I've outlined an approach
>> here which I find promising:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
>>
>> I've been discussing this approach with some of the operator
>> contributors: Gyula, Marton, Matyas, and Thomas (all in CC). We started
>> prototyping an implementation based on the current FLIP design. If that
>> goes well, we would like to contribute this to Flink based on the results
>> of the discussion here.
>>
>> I'm curious to hear your thoughts.
>>
>> -Max
>>
>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-04 Thread Gyula Fóra
Hi!

Thank you for the proposal Max! It is great to see this highly desired
feature finally take shape.

I think we have all the right building blocks to make this successful.

Cheers,
Gyula

On Fri, Nov 4, 2022 at 7:37 PM Maximilian Michels  wrote:

> Hi,
>
> I would like to kick off the discussion on implementing autoscaling for
> Flink as part of the Flink Kubernetes operator. I've outlined an approach
> here which I find promising:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
>
> I've been discussing this approach with some of the operator contributors:
> Gyula, Marton, Matyas, and Thomas (all in CC). We started prototyping an
> implementation based on the current FLIP design. If that goes well, we
> would like to contribute this to Flink based on the results of the
> discussion here.
>
> I'm curious to hear your thoughts.
>
> -Max
>