Re: [ANNOUNCE] New Apache Flink PMC Member - Fan Rui

2024-06-06 Thread Xia Sun
Congratulations, Rui!

Best,
Xia

Paul Lam  于2024年6月6日周四 11:59写道:

> Congrats, Rui!
>
> Best,
> Paul Lam
>
> > 2024年6月6日 11:02,Junrui Lee  写道:
> >
> > Congratulations, Rui.
> >
> > Best,
> > Junrui
> >
> > Hang Ruan  于2024年6月6日周四 10:35写道:
> >
> >> Congratulations, Rui!
> >>
> >> Best,
> >> Hang
> >>
> >> Samrat Deb  于2024年6月6日周四 10:28写道:
> >>
> >>> Congratulations Rui
> >>>
> >>> Bests,
> >>> Samrat
> >>>
> >>> On Thu, 6 Jun 2024 at 7:45 AM, Yuxin Tan 
> wrote:
> >>>
>  Congratulations, Rui!
> 
>  Best,
>  Yuxin
> 
> 
>  Xuannan Su  于2024年6月6日周四 09:58写道:
> 
> > Congratulations!
> >
> > Best regards,
> > Xuannan
> >
> > On Thu, Jun 6, 2024 at 9:53 AM Hangxiang Yu 
> >>> wrote:
> >>
> >> Congratulations, Rui !
> >>
> >> On Thu, Jun 6, 2024 at 9:18 AM Lincoln Lee  >>>
> > wrote:
> >>
> >>> Congratulations, Rui!
> >>>
> >>> Best,
> >>> Lincoln Lee
> >>>
> >>>
> >>> Lijie Wang  于2024年6月6日周四 09:11写道:
> >>>
>  Congratulations, Rui!
> 
>  Best,
>  Lijie
> 
>  Rodrigo Meneses  于2024年6月5日周三 21:35写道:
> 
> > All the best
> >
> > On Wed, Jun 5, 2024 at 5:56 AM xiangyu feng <
>  xiangyu...@gmail.com>
>  wrote:
> >
> >> Congratulations, Rui!
> >>
> >> Regards,
> >> Xiangyu Feng
> >>
> >> Feng Jin  于2024年6月5日周三 20:42写道:
> >>
> >>> Congratulations, Rui!
> >>>
> >>>
> >>> Best,
> >>> Feng Jin
> >>>
> >>> On Wed, Jun 5, 2024 at 8:23 PM Yanfei Lei <
>  fredia...@gmail.com
> >>
>  wrote:
> >>>
>  Congratulations, Rui!
> 
>  Best,
>  Yanfei
> 
>  Luke Chen  于2024年6月5日周三 20:08写道:
> >
> > Congrats, Rui!
> >
> > Luke
> >
> > On Wed, Jun 5, 2024 at 8:02 PM Jiabao Sun <
> >>> jiabao...@apache.org>
> >>> wrote:
> >
> >> Congrats, Rui. Well-deserved!
> >>
> >> Best,
> >> Jiabao
> >>
> >> Zhanghao Chen 
> >>> 于2024年6月5日周三
>  19:29写道:
> >>
> >>> Congrats, Rui!
> >>>
> >>> Best,
> >>> Zhanghao Chen
> >>> 
> >>> From: Piotr Nowojski 
> >>> Sent: Wednesday, June 5, 2024 18:01
> >>> To: dev ; rui fan <
>  1996fan...@gmail.com>
> >>> Subject: [ANNOUNCE] New Apache Flink PMC Member -
> >>> Fan
> > Rui
> >>>
> >>> Hi everyone,
> >>>
> >>> On behalf of the PMC, I'm very happy to announce
> > another
> >>> new
> >> Apache
>  Flink
> >>> PMC Member - Fan Rui.
> >>>
> >>> Rui has been active in the community since August
>  2019.
>  During
> >> this
>  time
> >> he
> >>> has contributed a lot of new features. Among
> >>> others:
> >>>  - Decoupling Autoscaler from Kubernetes
> >> Operator,
>  and
> >> supporting
> >>> Standalone Autoscaler
> >>>  - Improvements to checkpointing, flamegraphs,
>  restart
> >> strategies,
> >>> watermark alignment, network shuffles
> >>>  - Optimizing the memory and CPU usage of large
> > operators,
> >> greatly
> >>> reducing the risk and probability of TaskManager
> >>> OOM
> >>>
> >>> He reviewed a significant amount of PRs and has
> >>> been
> > active
> > both
> >> on
>  the
> >>> mailing lists and in Jira helping to both
> >> maintain
>  and
> > grow
> >> Apache
> >> Flink's
> >>> community. He is also our current Flink 1.20
> >>> release
> >>> manager.
> >>>
> >>> In the last 12 months, Rui has been the most
> >> active
>  contributor
> >> in
>  the
> >>> Flink Kubernetes Operator project, while being
> >> the
>  2nd
> > most
> >> active
>  Flink
> >>> contributor at the same time.
> >>>
> >>> Please join me in welcoming and congratulating
> >> Fan
>  Rui!
> >>>
> >>> Best,
> >>> Piotrek (on behalf of the Flink PMC)
> >>>
> >>
> 
> >>>
> >>
> >
> 
> >>>
> >>
> >>
> >> --
> >> Best,
> >> Hangxiang.
> >
> 
> >>>
> >>
>
>


[RESULT][VOTE] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-05-05 Thread Xia Sun
Hi all,

FLIP-445: Support dynamic parallelism inference for HiveSource[1] has been
accepted and voted through this thread [2].

The proposal has been accepted with 6 approving votes (5 binding) and there
is no disapproval:

- Muhammet Orazov (non-binding)
- Rui Fan (binding)
- Ron Liu (binding)
- Zhu Zhu (binding)
- Lijie Wang (binding)
- yuxia (binding)

Thanks to all involved.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-445%3A+Support+dynamic+parallelism+inference+for+HiveSource
[2] https://lists.apache.org/thread/lktnb162l2z3042m76to6xfbsdndy4r7

Best,
Xia


[VOTE] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-04-25 Thread Xia Sun
Hi everyone,

I'd like to start a vote on FLIP-445: Support dynamic parallelism inference
for HiveSource[1] which has been discussed in this thread [2].

The vote will be open for at least 72 hours unless there is an objection or
not enough votes.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-445%3A+Support+dynamic+parallelism+inference+for+HiveSource
[2] https://lists.apache.org/thread/4k1qx6lodhbkknkhjyl0lq9bx8fcpjvn


Best,
Xia


Re: [DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-04-25 Thread Xia Sun
Hi Venkat,

Thanks for joining the discussion.
Based on our understanding, there are still a significant number of
existing tasks using Hive. Indeed, many companies are now migrating their
data to the lakehouse, but due to historical reasons, a substantial amount
of data still resides in Hive.

Best,
Xia

Venkatakrishnan Sowrirajan  于2024年4月25日周四 11:52写道:

> Hi Xia,
>
> +1 on introducing dynamic parallelism inference for HiveSource.
>
> Orthogonal to this discussion, curious, how commonly HiveSource is used
> these days in the industry given the popularity of table formats/sources
> like Iceberg, Hudi and Delta lake?
>
> Thanks
> Venkat
>
> On Wed, Apr 24, 2024, 7:41 PM Xia Sun  wrote:
>
> > Hi everyone,
> >
> > Thanks for all the feedback!
> >
> > If there are no more comments, I would like to start the vote thread,
> > thanks again!
> >
> > Best,
> > Xia
> >
> > Ahmed Hamdy  于2024年4月18日周四 21:31写道:
> >
> > > Hi Xia,
> > > I have read through the FLIP and discussion and the new version of the
> > FLIP
> > > looks better.
> > > +1 for the proposal.
> > > Best Regards
> > > Ahmed Hamdy
> > >
> > >
> > > On Thu, 18 Apr 2024 at 12:21, Ron Liu  wrote:
> > >
> > > > Hi, Xia
> > > >
> > > > Thanks for updating, looks good to me.
> > > >
> > > > Best,
> > > > Ron
> > > >
> > > > Xia Sun  于2024年4月18日周四 19:11写道:
> > > >
> > > > > Hi Ron,
> > > > > Yes, presenting it in a table might be more intuitive. I have
> already
> > > > added
> > > > > the table in the "Public Interfaces | New Config Option" chapter of
> > > FLIP.
> > > > > PTAL~
> > > > >
> > > > > Ron Liu  于2024年4月18日周四 18:10写道:
> > > > >
> > > > > > Hi, Xia
> > > > > >
> > > > > > Thanks for your reply.
> > > > > >
> > > > > > > That means, in terms
> > > > > > of priority, `table.exec.hive.infer-source-parallelism` >
> > > > > > `table.exec.hive.infer-source-parallelism.mode`.
> > > > > >
> > > > > > I still have some confusion, if the
> > > > > > `table.exec.hive.infer-source-parallelism`
> > > > > > >`table.exec.hive.infer-source-parallelism.mode`, currently
> > > > > > `table.exec.hive.infer-source-parallelism` default value is true,
> > > that
> > > > > > means always static parallelism inference work? Or perhaps after
> > this
> > > > > FLIP,
> > > > > > we changed the default behavior of
> > > > > > `table.exec.hive.infer-source-parallelism` to indicate dynamic
> > > > > parallelism
> > > > > > inference when enabled.
> > > > > > I think you should list the various behaviors of these two
> options
> > > that
> > > > > > coexist in FLIP by a table, only then users can know how the
> > dynamic
> > > > and
> > > > > > static parallelism inference work.
> > > > > >
> > > > > > Best,
> > > > > > Ron
> > > > > >
> > > > > > Xia Sun  于2024年4月18日周四 16:33写道:
> > > > > >
> > > > > > > Hi Ron and Lijie,
> > > > > > > Thanks for joining the discussion and sharing your suggestions.
> > > > > > >
> > > > > > > > the InferMode class should also be introduced in the Public
> > > > > Interfaces
> > > > > > > > section!
> > > > > > >
> > > > > > >
> > > > > > > Thanks for the reminder, I have now added the InferMode class
> to
> > > the
> > > > > > Public
> > > > > > > Interfaces section as well.
> > > > > > >
> > > > > > > > `table.exec.hive.infer-source-parallelism.max` is 1024, I
> > checked
> > > > > > through
> > > > > > > > the code that the default value is 1000?
> > > > > > >
> > > > > > >
> > > > > > > I have checked and the default value of
> > > > > > > `table.exec.hive.infer-source-parallelism.max` is indeed 1000.
> > This
> > > >

Re: [DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-04-24 Thread Xia Sun
Hi everyone,

Thanks for all the feedback!

If there are no more comments, I would like to start the vote thread,
thanks again!

Best,
Xia

Ahmed Hamdy  于2024年4月18日周四 21:31写道:

> Hi Xia,
> I have read through the FLIP and discussion and the new version of the FLIP
> looks better.
> +1 for the proposal.
> Best Regards
> Ahmed Hamdy
>
>
> On Thu, 18 Apr 2024 at 12:21, Ron Liu  wrote:
>
> > Hi, Xia
> >
> > Thanks for updating, looks good to me.
> >
> > Best,
> > Ron
> >
> > Xia Sun  于2024年4月18日周四 19:11写道:
> >
> > > Hi Ron,
> > > Yes, presenting it in a table might be more intuitive. I have already
> > added
> > > the table in the "Public Interfaces | New Config Option" chapter of
> FLIP.
> > > PTAL~
> > >
> > > Ron Liu  于2024年4月18日周四 18:10写道:
> > >
> > > > Hi, Xia
> > > >
> > > > Thanks for your reply.
> > > >
> > > > > That means, in terms
> > > > of priority, `table.exec.hive.infer-source-parallelism` >
> > > > `table.exec.hive.infer-source-parallelism.mode`.
> > > >
> > > > I still have some confusion, if the
> > > > `table.exec.hive.infer-source-parallelism`
> > > > >`table.exec.hive.infer-source-parallelism.mode`, currently
> > > > `table.exec.hive.infer-source-parallelism` default value is true,
> that
> > > > means always static parallelism inference work? Or perhaps after this
> > > FLIP,
> > > > we changed the default behavior of
> > > > `table.exec.hive.infer-source-parallelism` to indicate dynamic
> > > parallelism
> > > > inference when enabled.
> > > > I think you should list the various behaviors of these two options
> that
> > > > coexist in FLIP by a table, only then users can know how the dynamic
> > and
> > > > static parallelism inference work.
> > > >
> > > > Best,
> > > > Ron
> > > >
> > > > Xia Sun  于2024年4月18日周四 16:33写道:
> > > >
> > > > > Hi Ron and Lijie,
> > > > > Thanks for joining the discussion and sharing your suggestions.
> > > > >
> > > > > > the InferMode class should also be introduced in the Public
> > > Interfaces
> > > > > > section!
> > > > >
> > > > >
> > > > > Thanks for the reminder, I have now added the InferMode class to
> the
> > > > Public
> > > > > Interfaces section as well.
> > > > >
> > > > > > `table.exec.hive.infer-source-parallelism.max` is 1024, I checked
> > > > through
> > > > > > the code that the default value is 1000?
> > > > >
> > > > >
> > > > > I have checked and the default value of
> > > > > `table.exec.hive.infer-source-parallelism.max` is indeed 1000. This
> > has
> > > > > been corrected in the FLIP.
> > > > >
> > > > > > how are`table.exec.hive.infer-source-parallelism` and
> > > > > > `table.exec.hive.infer-source-parallelism.mode` compatible?
> > > > >
> > > > >
> > > > > This is indeed a critical point. The current plan is to deprecate
> > > > > `table.exec.hive.infer-source-parallelism` but still utilize it as
> > the
> > > > main
> > > > > switch for enabling automatic parallelism inference. That means, in
> > > terms
> > > > > of priority, `table.exec.hive.infer-source-parallelism` >
> > > > > `table.exec.hive.infer-source-parallelism.mode`. In future
> versions,
> > if
> > > > > `table.exec.hive.infer-source-parallelism` is removed, this logic
> > will
> > > > also
> > > > > need to be revised, leaving only
> > > > > `table.exec.hive.infer-source-parallelism.mode` as the basis for
> > > deciding
> > > > > whether to enable parallelism inference. I have also added this
> > > > description
> > > > > to the FLIP.
> > > > >
> > > > >
> > > > > > In FLIP-367 it is supported to be able to set the Source's
> > > parallelism
> > > > > > individually, if in the future HiveSource also supports this
> > feature,
> > > > > > however, the default value of
> > > > > > `table.exec.hive.infer-source-parallelism.mode` is
> > 

Re: [DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-04-18 Thread Xia Sun
Hi Ron,
Yes, presenting it in a table might be more intuitive. I have already added
the table in the "Public Interfaces | New Config Option" chapter of FLIP.
PTAL~

Ron Liu  于2024年4月18日周四 18:10写道:

> Hi, Xia
>
> Thanks for your reply.
>
> > That means, in terms
> of priority, `table.exec.hive.infer-source-parallelism` >
> `table.exec.hive.infer-source-parallelism.mode`.
>
> I still have some confusion, if the
> `table.exec.hive.infer-source-parallelism`
> >`table.exec.hive.infer-source-parallelism.mode`, currently
> `table.exec.hive.infer-source-parallelism` default value is true, that
> means always static parallelism inference work? Or perhaps after this FLIP,
> we changed the default behavior of
> `table.exec.hive.infer-source-parallelism` to indicate dynamic parallelism
> inference when enabled.
> I think you should list the various behaviors of these two options that
> coexist in FLIP by a table, only then users can know how the dynamic and
> static parallelism inference work.
>
> Best,
> Ron
>
> Xia Sun  于2024年4月18日周四 16:33写道:
>
> > Hi Ron and Lijie,
> > Thanks for joining the discussion and sharing your suggestions.
> >
> > > the InferMode class should also be introduced in the Public Interfaces
> > > section!
> >
> >
> > Thanks for the reminder, I have now added the InferMode class to the
> Public
> > Interfaces section as well.
> >
> > > `table.exec.hive.infer-source-parallelism.max` is 1024, I checked
> through
> > > the code that the default value is 1000?
> >
> >
> > I have checked and the default value of
> > `table.exec.hive.infer-source-parallelism.max` is indeed 1000. This has
> > been corrected in the FLIP.
> >
> > > how are`table.exec.hive.infer-source-parallelism` and
> > > `table.exec.hive.infer-source-parallelism.mode` compatible?
> >
> >
> > This is indeed a critical point. The current plan is to deprecate
> > `table.exec.hive.infer-source-parallelism` but still utilize it as the
> main
> > switch for enabling automatic parallelism inference. That means, in terms
> > of priority, `table.exec.hive.infer-source-parallelism` >
> > `table.exec.hive.infer-source-parallelism.mode`. In future versions, if
> > `table.exec.hive.infer-source-parallelism` is removed, this logic will
> also
> > need to be revised, leaving only
> > `table.exec.hive.infer-source-parallelism.mode` as the basis for deciding
> > whether to enable parallelism inference. I have also added this
> description
> > to the FLIP.
> >
> >
> > > In FLIP-367 it is supported to be able to set the Source's parallelism
> > > individually, if in the future HiveSource also supports this feature,
> > > however, the default value of
> > > `table.exec.hive.infer-source-parallelism.mode` is `InferMode.DYNAMIC`,
> > at
> > > this point will the parallelism be dynamically derived or will the
> > manually
> > > set parallelism take effect, and who has the higher priority?
> >
> >
> > From my understanding, 'manually set parallelism' has the higher
> priority,
> > just like one of the preconditions for the effectiveness of dynamic
> > parallelism inference in the AdaptiveBatchScheduler is that the vertex's
> > parallelism isn't set. I believe whether it's static inference or dynamic
> > inference, the manually set parallelism by the user should be respected.
> >
> > > The `InferMode.NONE` option.
> >
> > Currently, 'adding InferMode.NONE' seems to be the prevailing opinion. I
> > will add InferMode.NONE as one of the Enum options in InferMode class.
> >
> > Best,
> > Xia
> >
> > Lijie Wang  于2024年4月18日周四 13:50写道:
> >
> > > Thanks for driving the discussion.
> > >
> > > +1 for the proposal and +1 for the `InferMode.NONE` option.
> > >
> > > Best,
> > > Lijie
> > >
> > > Ron liu  于2024年4月18日周四 11:36写道:
> > >
> > > > Hi, Xia
> > > >
> > > > Thanks for driving this FLIP.
> > > >
> > > > This proposal looks good to me overall. However, I have the following
> > > minor
> > > > questions:
> > > >
> > > > 1. FLIP introduced `table.exec.hive.infer-source-parallelism.mode`
> as a
> > > new
> > > > parameter, and the value is the enum class `InferMode`, I think the
> > > > InferMode class should also be introduced in the Public Interfaces
> > > section!
> > > > 2. You mentioned in FLIP that the default valu

Re: [DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-04-18 Thread Xia Sun
Hi Ron and Lijie,
Thanks for joining the discussion and sharing your suggestions.

> the InferMode class should also be introduced in the Public Interfaces
> section!


Thanks for the reminder, I have now added the InferMode class to the Public
Interfaces section as well.

> `table.exec.hive.infer-source-parallelism.max` is 1024, I checked through
> the code that the default value is 1000?


I have checked and the default value of
`table.exec.hive.infer-source-parallelism.max` is indeed 1000. This has
been corrected in the FLIP.

> how are`table.exec.hive.infer-source-parallelism` and
> `table.exec.hive.infer-source-parallelism.mode` compatible?


This is indeed a critical point. The current plan is to deprecate
`table.exec.hive.infer-source-parallelism` but still utilize it as the main
switch for enabling automatic parallelism inference. That means, in terms
of priority, `table.exec.hive.infer-source-parallelism` >
`table.exec.hive.infer-source-parallelism.mode`. In future versions, if
`table.exec.hive.infer-source-parallelism` is removed, this logic will also
need to be revised, leaving only
`table.exec.hive.infer-source-parallelism.mode` as the basis for deciding
whether to enable parallelism inference. I have also added this description
to the FLIP.


> In FLIP-367 it is supported to be able to set the Source's parallelism
> individually, if in the future HiveSource also supports this feature,
> however, the default value of
> `table.exec.hive.infer-source-parallelism.mode` is `InferMode.DYNAMIC`, at
> this point will the parallelism be dynamically derived or will the manually
> set parallelism take effect, and who has the higher priority?


>From my understanding, 'manually set parallelism' has the higher priority,
just like one of the preconditions for the effectiveness of dynamic
parallelism inference in the AdaptiveBatchScheduler is that the vertex's
parallelism isn't set. I believe whether it's static inference or dynamic
inference, the manually set parallelism by the user should be respected.

> The `InferMode.NONE` option.

Currently, 'adding InferMode.NONE' seems to be the prevailing opinion. I
will add InferMode.NONE as one of the Enum options in InferMode class.

Best,
Xia

Lijie Wang  于2024年4月18日周四 13:50写道:

> Thanks for driving the discussion.
>
> +1 for the proposal and +1 for the `InferMode.NONE` option.
>
> Best,
> Lijie
>
> Ron liu  于2024年4月18日周四 11:36写道:
>
> > Hi, Xia
> >
> > Thanks for driving this FLIP.
> >
> > This proposal looks good to me overall. However, I have the following
> minor
> > questions:
> >
> > 1. FLIP introduced `table.exec.hive.infer-source-parallelism.mode` as a
> new
> > parameter, and the value is the enum class `InferMode`, I think the
> > InferMode class should also be introduced in the Public Interfaces
> section!
> > 2. You mentioned in FLIP that the default value of
> > `table.exec.hive.infer-source-parallelism.max` is 1024, I checked through
> > the code that the default value is 1000?
> > 3. I also agree with Muhammet's idea that there is no need to introduce
> the
> > option `table.exec.hive.infer-source-parallelism.enabled`, and that
> > expanding the InferMode values will fulfill the need. There is another
> > issue to consider here though, how are
> > `table.exec.hive.infer-source-parallelism` and
> > `table.exec.hive.infer-source-parallelism.mode` compatible?
> > 4. In FLIP-367 it is supported to be able to set the Source's parallelism
> > individually, if in the future HiveSource also supports this feature,
> > however, the default value of
> > `table.exec.hive.infer-source-parallelism.mode` is `InferMode. DYNAMIC`,
> at
> > this point will the parallelism be dynamically derived or will the
> manually
> > set parallelism take effect, and who has the higher priority?
> >
> > Best,
> > Ron
> >
> > Xia Sun  于2024年4月17日周三 12:08写道:
> >
> > > Hi Jeyhun, Muhammet,
> > > Thanks for all the feedback!
> > >
> > > > Could you please mention the default values for the new
> configurations
> > > > (e.g., table.exec.hive.infer-source-parallelism.mode,
> > > > table.exec.hive.infer-source-parallelism.enabled,
> > > > etc) ?
> > >
> > >
> > > Thanks for your suggestion. I have supplemented the explanation
> regarding
> > > the default values.
> > >
> > > > Since we are introducing the mode as a configuration option,
> > > > could it make sense to have `InferMode.NONE` option also?
> > > > The `NONE` option would disable the inference.
> > >
> > >
> > > This is a good idea. Looking ahead, i

Re: [DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-04-16 Thread Xia Sun
Hi Jeyhun, Muhammet,
Thanks for all the feedback!

> Could you please mention the default values for the new configurations
> (e.g., table.exec.hive.infer-source-parallelism.mode,
> table.exec.hive.infer-source-parallelism.enabled,
> etc) ?


Thanks for your suggestion. I have supplemented the explanation regarding
the default values.

> Since we are introducing the mode as a configuration option,
> could it make sense to have `InferMode.NONE` option also?
> The `NONE` option would disable the inference.


This is a good idea. Looking ahead, it could eliminate the need for
introducing
a new configuration option. I haven't identified any potential
compatibility issues
as yet. If there are no further ideas from others, I'll go ahead and update
the FLIP to
introducing InferMode.NONE.

Best,
Xia

Muhammet Orazov  于2024年4月17日周三 10:31写道:

> Hello Xia,
>
> Thanks for the FLIP!
>
> Since we are introducing the mode as a configuration option,
> could it make sense to have `InferMode.NONE` option also?
> The `NONE` option would disable the inference.
>
> This way we deprecate the `table.exec.hive.infer-source-parallelism`
> and no additional `table.exec.hive.infer-source-parallelism.enabled`
> option is required.
>
> What do you think?
>
> Best,
> Muhammet
>
> On 2024-04-16 07:07, Xia Sun wrote:
> > Hi everyone,
> > I would like to start a discussion on FLIP-445: Support dynamic
> > parallelism
> > inference for HiveSource[1].
> >
> > FLIP-379[2] has introduced dynamic source parallelism inference for
> > batch
> > jobs, which can utilize runtime information to more accurately decide
> > the
> > source parallelism. As a follow-up task, we plan to implement the
> > dynamic
> > parallelism inference interface for HiveSource, and also switch the
> > default
> > static parallelism inference to dynamic parallelism inference.
> >
> > Looking forward to your feedback and suggestions, thanks.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-445%3A+Support+dynamic+parallelism+inference+for+HiveSource
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
> >
> > Best regards,
> > Xia
>


[DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-04-16 Thread Xia Sun
Hi everyone,
I would like to start a discussion on FLIP-445: Support dynamic parallelism
inference for HiveSource[1].

FLIP-379[2] has introduced dynamic source parallelism inference for batch
jobs, which can utilize runtime information to more accurately decide the
source parallelism. As a follow-up task, we plan to implement the dynamic
parallelism inference interface for HiveSource, and also switch the default
static parallelism inference to dynamic parallelism inference.

Looking forward to your feedback and suggestions, thanks.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-445%3A+Support+dynamic+parallelism+inference+for+HiveSource
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs

Best regards,
Xia


Re: [ANNOUNCE] New Apache Flink Committer - Zakelly Lan

2024-04-15 Thread Xia Sun
Congratulations Zakelly!

 Best,
 Xia

Leonard Xu  于2024年4月15日周一 16:16写道:

> Congratulations Zakelly!
>
>
> Best,
> Leonard
> > 2024年4月15日 下午3:56,Samrat Deb  写道:
> >
> > Congratulations Zakelly!
>
>


Re: Question around Flink's AdaptiveBatchScheduler

2024-04-15 Thread Xia Sun
Hi Venkat,
I agree that the parallelism of source vertex should not be upper bounded
by the job's global max parallelism. The case you mentioned, >> High filter
selectivity with huge amounts of data to read  excellently supports this
viewpoint. (In fact, in the current implementation, if the source
parallelism is pre-specified at job create stage, rather than relying on
the dynamic parallelism inference of the AdaptiveBatchScheduler, the source
vertex's parallelism can indeed exceed the job's global max parallelism.)

As Lijie and Junrui pointed out, the key issue is "semantic consistency."
Currently, if a vertex has not set maxParallelism, the
AdaptiveBatchScheduler will use
`execution.batch.adaptive.auto-parallelism.max-parallelism` as the vertex's
maxParallelism. Since the current implementation does not distinguish
between source vertices and downstream vertices, source vertices are also
subject to this limitation.

Therefore, I believe that if the issue of "semantic consistency" can be
well explained in the code and configuration documentation, the
AdaptiveBatchScheduler should support that the parallelism of source
vertices can exceed the job's global max parallelism.

Best,
Xia

Venkatakrishnan Sowrirajan  于2024年4月14日周日 10:31写道:

> Let me state why I think "*jobmanager.adaptive-batch-sche*
> *duler.default-source-parallelism*" should not be bound by the "
> *jobmanager.adaptive-batch-sche**duler.max-parallelism*".
>
>- Source vertex is unique and does not have any upstream vertices
>- Downstream vertices read shuffled data partitioned by key, which is
>not the case for the Source vertex
>- Limiting source parallelism by downstream vertices' max parallelism is
>incorrect
>
> If we say for ""semantic consistency" the source vertex parallelism has to
> be bound by the overall job's max parallelism, it can lead to following
> issues:
>
>- High filter selectivity with huge amounts of data to read - setting
>high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so that
>source parallelism can be set higher can lead to small blocks and
>sub-optimal performance.
>- Setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
>requires careful tuning of network buffer configurations which is
>unnecessary in cases where it is not required just so that the source
>parallelism can be set high.
>
> Regards
> Venkata krishnan
>
> On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee  wrote:
>
> > Hello Venkata krishnan,
> >
> > I think the term "semantic inconsistency" defined by
> > jobmanager.adaptive-batch-scheduler.max-parallelism refers to
> maintaining a
> > uniform upper limit on parallelism across all vertices within a job. As
> the
> > source vertices are part of the global execution graph, they should also
> > respect this rule to ensure consistent application of parallelism
> > constraints.
> >
> > Best,
> > Junrui
> >
> > Venkatakrishnan Sowrirajan  于2024年4月12日周五 02:10写道:
> >
> > > Gentle bump on this question. cc @Becket Qin  as
> > > well.
> > >
> > > Regards
> > > Venkata krishnan
> > >
> > >
> > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan <
> > > vsowr...@asu.edu> wrote:
> > >
> > > > Thanks for the response Lijie and Junrui. Sorry for the late reply.
> Few
> > > > follow up questions.
> > > >
> > > > > Source can actually ignore this limit
> > > > because it has no upstream, but this will lead to semantic
> > inconsistency.
> > > >
> > > > Lijie, can you please elaborate on the above comment further? What do
> > you
> > > > mean when you say it will lead to "semantic inconsistency"?
> > > >
> > > > > Secondly, we first need to limit the max parallelism of
> (downstream)
> > > > vertex, and then we can decide how many subpartitions (upstream
> vertex)
> > > > should produce. The limit should be effective, otherwise some
> > downstream
> > > > tasks will have no data to process.
> > > >
> > > > This makes sense in the context of any other vertices other than the
> > > > source vertex. As you mentioned above ("Source can actually ignore
> this
> > > > limit because it has no upstream"), therefore I feel "
> > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism" need
> > not
> > > > be upper bounded by
> > > "jobmanager.adaptive-batch-scheduler.max-parallelism".
> > > >
> > > > Regards
> > > > Venkata krishnan
> > > >
> > > >
> > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee 
> > wrote:
> > > >
> > > >> Hi Venkat,
> > > >>
> > > >> As Lijie mentioned,  in Flink, the parallelism is required to be
> less
> > > than
> > > >> or equal to the maximum parallelism. The config option
> > > >> jobmanager.adaptive-batch-scheduler.max-parallelism and
> > > >> jobmanager.adaptive-batch-scheduler.default-source-parallelism will
> be
> > > set
> > > >> as the source's parallelism and max-parallelism, respectively.
> > > Therefore,
> > > >> the check failed situation you encountered is in line with the
> > > >> expectations.

Re: [ANNOUNCE] New Apache Flink PMC Member - Jing Ge

2024-04-14 Thread Xia Sun
Congratulations, Jing!

Best,
Xia

Ferenc Csaky  于2024年4月13日周六 00:50写道:

> Congratulations, Jing!
>
> Best,
> Ferenc
>
>
>
> On Friday, April 12th, 2024 at 13:54, Ron liu  wrote:
>
> >
> >
> > Congratulations, Jing!
> >
> > Best,
> > Ron
> >
> > Junrui Lee jrlee@gmail.com 于2024年4月12日周五 18:54写道:
> >
> > > Congratulations, Jing!
> > >
> > > Best,
> > > Junrui
> > >
> > > Aleksandr Pilipenko z3d...@gmail.com 于2024年4月12日周五 18:28写道:
> > >
> > > > Congratulations, Jing!
> > > >
> > > > Best Regards,
> > > > Aleksandr
>


Re: [ANNOUNCE] New Apache Flink PMC Member - Lincoln Lee

2024-04-14 Thread Xia Sun
Congratulations Lincoln !

Best,
Xia

Ferenc Csaky  于2024年4月13日周六 00:50写道:

> Congratulations, Lincoln!
>
> Best,
> Ferenc
>
>
>
>
> On Friday, April 12th, 2024 at 15:54, lorenzo.affe...@ververica.com.INVALID
>  wrote:
>
> >
> >
> > Huge congrats! Well done!
> > On Apr 12, 2024 at 13:56 +0200, Ron liu ron9@gmail.com, wrote:
> >
> > > Congratulations, Lincoln!
> > >
> > > Best,
> > > Ron
> > >
> > > Junrui Lee jrlee@gmail.com 于2024年4月12日周五 18:54写道:
> > >
> > > > Congratulations, Lincoln!
> > > >
> > > > Best,
> > > > Junrui
> > > >
> > > > Aleksandr Pilipenko z3d...@gmail.com 于2024年4月12日周五 18:29写道:
> > > >
> > > > > > Congratulations, Lincoln!
> > > > > >
> > > > > > Best Regards
> > > > > > Aleksandr
>


[RESULT][VOTE] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-12-05 Thread Xia Sun
Dear developers,

FLIP-379: Dynamic source parallelism inference for batch jobs[1] has been
accepted and voted through this thread [2].

The proposal received 6 approving binding votes and there is no disapproval:

- Zhu Zhu (binding)
- Lijie Wang (binding)
- Rui Fan (binding)
- Etienne Chauchot (binding)
- Leonard Xu (binding)
- Jingsong Li (binding)

Thanks to all involved.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
[2] https://lists.apache.org/thread/g03m2r8dodz6gn8jgf36mvq60h1tsnqg

Best,
Xia


[VOTE] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-29 Thread Xia Sun
Hi everyone,

I'd like to start a vote on FLIP-379: Dynamic source parallelism inference
for batch jobs[1] which has been discussed in this thread [2].

The vote will be open for at least 72 hours unless there is an objection or
not enough votes.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
[2] https://lists.apache.org/thread/ocftkqy5d2x4n58wzprgm5qqrzzkbmb8


Best Regards,
Xia


Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-28 Thread Xia Sun
Hi everyone,

Thanks for all the comments! I will initiate the vote tomorrow if there is
no further discussion.

Best,
Xia

Leonard Xu  于2023年11月24日周五 18:50写道:

> Thanks Xia and Zhu Zhu for driving this work,
>
> It will help unify the parallelism inference for all operators of batch
> job, the updated FLIP looks good to me.
>
> Best,
> Leonard
>
>
> > 2023年11月24日 下午5:53,Xia Sun  写道:
> >
> > Hi all,
> > Offline discussed with Zhu Zhu and Leonard Xu and we have reached the
> > following three points of consensus:
> >
> > 1. Rename the interface method Context#getMaxSourceParallelism proposed
> by
> > the FLIP to Context#getParallelismInferenceUpperBound, to make the
> meaning
> > of the method clearer. See [1] for details.
> >
> > 2. We provide a more detailed explanation of the effective priority of
> the
> > dynamic source parallelism inference proposed by this FLIP and the order
> of
> > values for the upper bound of source parallelism. We also point out the
> > current support and limitations of the AdaptiveBatchScheduler regarding
> > source parallelism inference. See [2] for details.
> >
> > 3. This FLIP will only focus on the framework-level implementation and
> will
> > prioritize the implementation of FileSource as an example of the new
> > interface proposed by the FLIP. The HiveSource, due to its existing
> static
> > parallelism dynamic inference, and changes in default values for
> > configuration items such as `table.exec.hive.infer-source-parallelism`,
> > requires a more detailed migration plan, as well as more comprehensive
> > design and discussion. It is not suitable as part of this FLIP and needs
> a
> > separate FLIP. Therefore, we have removed the HiveSource part from this
> > FLIP.
> >
> > Thanks again to everyone who participated in the discussion.
> > Looking forward to your continued feedback.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-IntroduceDynamicParallelismInferenceinterfaceforSource
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-ConfigurationBehaviorChanges
> >
> > Best,
> > Xia
> >
> > Leonard Xu  于2023年11月22日周三 18:37写道:
> >
> >> Thanks Xia for the  reply, sorry for the late reply.
> >>
> >>> Thanks for pointing out the issue, the current wording does indeed seem
> >> to
> >>> be confusing. It involves the existing implementation of the
> >>> AdaptiveBatchScheduler, where the dynamically inferred parallelism
> cannot
> >>> exceed the JobVertex's maxParallelism (which is typically set to either
> >> the
> >>> global default max parallelism or the user-specified JobVertex max
> >>> parallelism), so the flip maintains the logic. I have modified the flip
> >> to
> >>> avoid confusion as much as possible.
> >>
> >> I didn’t see the change part in this FLIP, could you check it?
> >>
> >>> We can use Configuration::getOptional to check if the user has
> configured
> >>> the
> >> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`.
> >>
> >> Using Readable#getOptional(ConfigOption option) makes sense to me.
> >>
> >>> As a follow-up task, we may have a dedicated discussion in the future
> to
> >>> see if we need to change the default value of
> >>> `table.exec.hive.infer-source-parallelism` to false. Before then, user
> >> can
> >>> manually set `table.exec.hive.infer-source-parallelism` to false to
> >> enable
> >>> dynamic parallelism inference, and use
> >>> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`
> to
> >>> replace `table.exec.hive.infer-source-parallelism.max` as the
> parallelism
> >>> inference upper bound. I have updated both the Flip's
> >>> DynamicParallelismInference interface implementation and Migration Plan
> >>> modules to illustrate this.
> >>
> >> In my opinion, moving HiveSource to subsequent discussion is not OK, see
> >> my explanation: HiveSource supports dynamic source parallel inference is
> >> one part of the FLIP implementation, it looks like that we introduce a
> >> configuration
> >> `execution.batch.adaptive.auto-paralleli

Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-24 Thread Xia Sun
ference is a useful feature for batch
> >>>> story. I’ve some comments about current design.
> >>>>
> >>>> 1.How user disable the parallelism inference if they want to use fixed
> >>>> source parallelism? They can configure fixed parallelism in table
> layer
> >>>> currently as you explained above.
> >>>>
> >>>> 2.Could you explain the priority the static parallelism set from table
> >>>> layer and the proposed dynamic source parallelism? And changing the
> >> default
> >>>> value `table.exec.hive.infer-source-parallelism` as a sub-task does
> not
> >>>> resolve all case, because other Sources can set their own parallelism
> >> too.
> >>>>
> >>>> 3.Current design only works for batch josb, the workflow for streaming
> >> job
> >>>> may looks like (1) inference  parallelism for streaming source like
> >> kafka
> >>>> (2) stop job with a savepoint  (3) apply new parallelism for job (4)
> >>>> schedule the streaming job from savepoint which is totally different,
> >> the
> >>>> later one lacks a lot of infra in Flink, right?  So, could we consider
> >> the
> >>>> boundness info when design the interface? Both FileSource and Hive
> >> Source
> >>>> offer streaming read ability, imaging this case: Flink Streaming Hive
> >>>> Source should not apply the dynamic source parallelism even it
> >> implemented
> >>>> the feature as it severing a streaming job.
> >>>>
> >>>> Best,
> >>>> Leonard
> >>>>
> >>>>
> >>>>> 2023年11月1日 下午6:21,Xia Sun  写道:
> >>>>>
> >>>>> Thanks Lijie for the comments!
> >>>>> 1. For Hive source, dynamic parallelism inference in batch scenarios
> >> is a
> >>>>> superset of static parallelism inference. As a follow-up task, we can
> >>>>> consider changing the default value of
> >>>>> 'table.exec.hive.infer-source-parallelism' to false.
> >>>>>
> >>>>> 2. I think that both dynamic parallelism inference and static
> >> parallelism
> >>>>> inference have their own use cases. Currently, for streaming sources
> >> and
> >>>>> other sources that are not sensitive to dynamic information, the
> >> benefits
> >>>>> of dynamic parallelism inference may not be significant. In such
> cases,
> >>>> we
> >>>>> can continue to use static parallelism inference.
> >>>>>
> >>>>> Thanks,
> >>>>> Xia
> >>>>>
> >>>>> Lijie Wang  于2023年11月1日周三 14:52写道:
> >>>>>
> >>>>>> Hi Xia,
> >>>>>>
> >>>>>> Thanks for driving this FLIP, +1 for the proposal.
> >>>>>>
> >>>>>> I have 2 questions about the relationship between static inference
> and
> >>>>>> dynamic inference:
> >>>>>>
> >>>>>> 1. AFAIK, currently the hive table source enable static inference by
> >>>>>> default. In this case, which one (static vs dynamic) will take
> effect
> >> ?
> >>>> I
> >>>>>> think it would be better if we can point this out in FLIP
> >>>>>>
> >>>>>> 2. As you mentioned above, dynamic inference is the most ideal way,
> so
> >>>> do
> >>>>>> we have plan to deprecate the static inference in the future?
> >>>>>>
> >>>>>> Best,
> >>>>>> Lijie
> >>>>>>
> >>>>>> Zhu Zhu  于2023年10月31日周二 20:19写道:
> >>>>>>
> >>>>>>> Thanks for opening the FLIP and kicking off this discussion, Xia!
> >>>>>>> The proposed changes make up an important missing part of the
> dynamic
> >>>>>>> parallelism inference of adaptive batch scheduler.
> >>>>>>>
> >>>>>>> Besides that, it is also one good step towards supporting dynamic
> >>>>>>> parallelism inference for streaming sources, e.g. allowing Kafka
> >>>>>>> sources to determine its parallelism automatically based on the
> >>>>>>> number of partitions.
> >>>>>>>
> >>>>>>> +1 for the proposal.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Zhu
> >>>>>>>
> >>>>>>> Xia Sun  于2023年10月31日周二 16:01写道:
> >>>>>>>
> >>>>>>>> Hi everyone,
> >>>>>>>> I would like to start a discussion on FLIP-379: Dynamic source
> >>>>>>> parallelism
> >>>>>>>> inference for batch jobs[1].
> >>>>>>>>
> >>>>>>>> In general, there are three main ways to set source parallelism
> for
> >>>>>> batch
> >>>>>>>> jobs:
> >>>>>>>> (1) User-defined source parallelism.
> >>>>>>>> (2) Connector static parallelism inference.
> >>>>>>>> (3) Dynamic parallelism inference.
> >>>>>>>>
> >>>>>>>> Compared to manually setting parallelism, automatic parallelism
> >>>>>> inference
> >>>>>>>> is easier to use and can better adapt to varying data volumes each
> >>>> day.
> >>>>>>>> However, static parallelism inference cannot leverage runtime
> >>>>>>> information,
> >>>>>>>> resulting in inaccurate parallelism inference. Therefore, for
> batch
> >>>>>> jobs,
> >>>>>>>> dynamic parallelism inference is the most ideal, but currently,
> the
> >>>>>>> support
> >>>>>>>> for adaptive batch scheduler is not very comprehensive.
> >>>>>>>>
> >>>>>>>> Therefore, we aim to introduce a general interface that enables
> the
> >>>>>>>> adaptive batch scheduler to dynamically infer the source
> parallelism
> >>>> at
> >>>>>>>> runtime. Please refer to the FLIP[1] document for more details
> about
> >>>>>> the
> >>>>>>>> proposed design and implementation.
> >>>>>>>>
> >>>>>>>> I also thank Zhu Zhu and LiJie Wang for their suggestions during
> the
> >>>>>>>> pre-discussion.
> >>>>>>>> Looking forward to your feedback and suggestions, thanks.
> >>>>>>>>
> >>>>>>>> [1]
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
> >>>>>>>>
> >>>>>>>> Best regards,
> >>>>>>>> Xia
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>>
> >>
> >>
>
>


Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-20 Thread Xia Sun
; > the Source::getBoundedness() method can also be obtained when inferring
> > parallelism.
>
> +1 about the boundedness info part.
>
> Okay, let’s come back the batch world and discuss some details about
> current design:
>
> (1) About  max source parallelism
> The FLIP said:  (1) Max source parallelism, which is calculated as the
> minimum of the default source parallelism
> (`execution.batch.adaptive.auto-parallelism.default-source-parallelism`)
> and JobVertex#maxParallelism. If the default-source-parallelism is not set,
> the global default parallelism is used as the default source parallelism.
>
> The  'Max source parallelism’ is the information that runtime offered to
> Source as a hint to infer the actual parallelism, a name with max prefix
> but calculated with minimum value confusing me a lot, especially when I
> read the HiveSource pseudocode:
>
> fileEnumerator.setMinNumSplits(maxSourceParallelism);
>
> Although I understand that naming is a complex topic in CS, could we
> improve this method name a little?  And,
> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`
> config has a default value with 1, how we distinguish user set it or not
> for example if user happen to  set value 1 ?
>
> (2) About changing default value of
> 'table.exec.hive.infer-source-parallelism'
> The FLIP said:  As a follow-up task, we will consider changing the default
> value of 'table.exec.hive.infer-source-parallelism' to false.
>
> No doubt that it’s a API breaking change, for existing hive users, the
> migration path is not clear in this FLIP, for example, current users used
> splits number to infer the source parallelism, after this FLIP,  could we
> give the recommended value of
> `execution.batch.adaptive.auto-parallelism.default-source-parallelism` or
> how to set it or event users do not need to set  anythins? And the
> replacement for migration replacement should add to
> 'table.exec.hive.infer-source-parallelism’s description when we propose to
> change its default value, right?
>
> (3) [minor] About the HiveSource
> The pseudocode  code shows:
>
> fileEnumerator.getInferredSourceParallelsim();
>
> IIRC, our public API FileEnumerator never offers such method, introducing
> getInferredSourceParallelsim() is also one part of our FLIP ?
>
> Best,
> Leonard
>
>
>
> >
> > Best regards,
> > Xia
> >
> > Leonard Xu  于2023年11月8日周三 16:19写道:
> >
> >> Thanks Xia and Zhu Zhu for kickoff this discussion.
> >>
> >> The dynamic source parallelism inference is a useful feature for batch
> >> story. I’ve some comments about current design.
> >>
> >> 1.How user disable the parallelism inference if they want to use fixed
> >> source parallelism? They can configure fixed parallelism in table layer
> >> currently as you explained above.
> >>
> >> 2.Could you explain the priority the static parallelism set from table
> >> layer and the proposed dynamic source parallelism? And changing the
> default
> >> value `table.exec.hive.infer-source-parallelism` as a sub-task does not
> >> resolve all case, because other Sources can set their own parallelism
> too.
> >>
> >> 3.Current design only works for batch josb, the workflow for streaming
> job
> >> may looks like (1) inference  parallelism for streaming source like
> kafka
> >> (2) stop job with a savepoint  (3) apply new parallelism for job (4)
> >> schedule the streaming job from savepoint which is totally different,
> the
> >> later one lacks a lot of infra in Flink, right?  So, could we consider
> the
> >> boundness info when design the interface? Both FileSource and Hive
> Source
> >> offer streaming read ability, imaging this case: Flink Streaming Hive
> >> Source should not apply the dynamic source parallelism even it
> implemented
> >> the feature as it severing a streaming job.
> >>
> >> Best,
> >> Leonard
> >>
> >>
> >>> 2023年11月1日 下午6:21,Xia Sun  写道:
> >>>
> >>> Thanks Lijie for the comments!
> >>> 1. For Hive source, dynamic parallelism inference in batch scenarios
> is a
> >>> superset of static parallelism inference. As a follow-up task, we can
> >>> consider changing the default value of
> >>> 'table.exec.hive.infer-source-parallelism' to false.
> >>>
> >>> 2. I think that both dynamic parallelism inference and static
> parallelism
> >>> inference have their own use cases. Currently, for streaming sources
> and
> >>> other sources

Re: [VOTE] FLIP-381: Deprecate configuration getters/setters that return/set complex Java objects

2023-11-12 Thread Xia Sun
+1 (non-binding)

Best,
Xia

Samrat Deb  于2023年11月13日周一 12:37写道:

> +1 (non binding)
>
> Bests,
> Samrat
>
> On Mon, 13 Nov 2023 at 9:10 AM, Yangze Guo  wrote:
>
> > +1 (binding)
> >
> > Best,
> > Yangze Guo
> >
> > On Mon, Nov 13, 2023 at 11:35 AM weijie guo 
> > wrote:
> > >
> > > +1(binding)
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Lijie Wang  于2023年11月13日周一 10:40写道:
> > >
> > > > +1 (binding)
> > > >
> > > > Best,
> > > > Lijie
> > > >
> > > > Yuepeng Pan  于2023年11月10日周五 18:32写道:
> > > >
> > > > > +1(non-binding)
> > > > >
> > > > > Best,
> > > > > Roc
> > > > >
> > > > > On 2023/11/10 03:58:10 Junrui Lee wrote:
> > > > > > Hi everyone,
> > > > > >
> > > > > > Thank you to everyone for the feedback on FLIP-381: Deprecate
> > > > > configuration
> > > > > > getters/setters that return/set complex Java objects[1] which has
> > been
> > > > > > discussed in this thread [2].
> > > > > >
> > > > > > I would like to start a vote for it. The vote will be open for at
> > least
> > > > > 72
> > > > > > hours (excluding weekends) unless there is an objection or not
> > enough
> > > > > votes.
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278464992
> > > > > > [2]
> > https://lists.apache.org/thread/y5owjkfxq3xs9lmpdbl6d6jmqdgbjqxo
> > > > > >
> > > > >
> > > >
> >
>


Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-09 Thread Xia Sun
 Thanks Leonard for the feedback and sorry for my late response.

> `How user disable the parallelism inference if they want to use fixed
source parallelism?`
> `Could you explain the priority the static parallelism set from table
layer and the proposed dynamic source parallelism?`

>From the user's perspective, if the user specifies a fixed parallelism for
the source, dynamic source parallelism inference will be automatically
disabled. From the perspective of priority, the user’s specified
parallelism > the static parallelism inference > dynamic parallelism
inference. Because the dynamic source parallelism inference will take
effect at the runtime stage and the validity conditions are: (1) the
current ExecutionGraph is a dynamic graph, and (2) the parallelism of the
source vertex is not specified (that is, the parallelism is -1).

> `the workflow for streaming job may looks like ... which is totally
different, the later one lacks a lot of infra in Flink, right?`

Indeed, as of now, the dynamic parallelism inference is exclusively for
batch jobs, so it only takes into account the necessary information for
batch scenarios. In the future, when we introduce support for automatic
parallelism inference in streaming jobs, we can include the required
information for streaming jobs to avoid unnecessarily complicating the
current design.
Moreover, The workflow you mentioned seems a bit complicated. Our current
idea is to perform the parallelism inference during the initialization
phase of streaming jobs and proceed to schedule the entire job once the
source parallelism is determined. This process will naturally occur during
job startup, eliminating the need for additional restarts.

> `So, could we consider the boundness info when design the interface? Both
FileSource and Hive Source offer streaming read ability, imaging this case:
Flink Streaming Hive Source should not apply the dynamic source parallelism
even it implemented the feature as it severing a streaming job.`

Thanks for your feedback, it is reallly a good input. Currently, the
dynamic parallelism inference logic is only triggered in batch jobs.
Therefore, the logic will not be called in the streaming jobs.
In the future, if streaming jobs also support runtime parallelism
inference, then theoretically, the source can no longer be distinguished
between streaming jobs and batch jobs at the runtime stage. In addition,
since the new interface is implemented together with the Source interface,
the Source::getBoundedness() method can also be obtained when inferring
parallelism.

Best regards,
Xia

Leonard Xu  于2023年11月8日周三 16:19写道:

> Thanks Xia and Zhu Zhu for kickoff this discussion.
>
> The dynamic source parallelism inference is a useful feature for batch
> story. I’ve some comments about current design.
>
> 1.How user disable the parallelism inference if they want to use fixed
> source parallelism? They can configure fixed parallelism in table layer
> currently as you explained above.
>
> 2.Could you explain the priority the static parallelism set from table
> layer and the proposed dynamic source parallelism? And changing the default
> value `table.exec.hive.infer-source-parallelism` as a sub-task does not
> resolve all case, because other Sources can set their own parallelism too.
>
> 3.Current design only works for batch josb, the workflow for streaming job
> may looks like (1) inference  parallelism for streaming source like kafka
> (2) stop job with a savepoint  (3) apply new parallelism for job (4)
> schedule the streaming job from savepoint which is totally different, the
> later one lacks a lot of infra in Flink, right?  So, could we consider the
> boundness info when design the interface? Both FileSource and Hive Source
> offer streaming read ability, imaging this case: Flink Streaming Hive
> Source should not apply the dynamic source parallelism even it implemented
> the feature as it severing a streaming job.
>
> Best,
> Leonard
>
>
> > 2023年11月1日 下午6:21,Xia Sun  写道:
> >
> > Thanks Lijie for the comments!
> > 1. For Hive source, dynamic parallelism inference in batch scenarios is a
> > superset of static parallelism inference. As a follow-up task, we can
> > consider changing the default value of
> > 'table.exec.hive.infer-source-parallelism' to false.
> >
> > 2. I think that both dynamic parallelism inference and static parallelism
> > inference have their own use cases. Currently, for streaming sources and
> > other sources that are not sensitive to dynamic information, the benefits
> > of dynamic parallelism inference may not be significant. In such cases,
> we
> > can continue to use static parallelism inference.
> >
> > Thanks,
> > Xia
> >
> > Lijie Wang  于2023年11月1日周三 14:52写道:
> >
> >> Hi Xia,
> >&

Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-01 Thread Xia Sun
Thanks Lijie for the comments!
1. For Hive source, dynamic parallelism inference in batch scenarios is a
superset of static parallelism inference. As a follow-up task, we can
consider changing the default value of
'table.exec.hive.infer-source-parallelism' to false.

2. I think that both dynamic parallelism inference and static parallelism
inference have their own use cases. Currently, for streaming sources and
other sources that are not sensitive to dynamic information, the benefits
of dynamic parallelism inference may not be significant. In such cases, we
can continue to use static parallelism inference.

Thanks,
Xia

Lijie Wang  于2023年11月1日周三 14:52写道:

> Hi Xia,
>
> Thanks for driving this FLIP, +1 for the proposal.
>
> I have 2 questions about the relationship between static inference and
> dynamic inference:
>
> 1. AFAIK, currently the hive table source enable static inference by
> default. In this case, which one (static vs dynamic) will take effect ? I
> think it would be better if we can point this out in FLIP
>
> 2. As you mentioned above, dynamic inference is the most ideal way, so do
> we have plan to deprecate the static inference in the future?
>
> Best,
> Lijie
>
> Zhu Zhu  于2023年10月31日周二 20:19写道:
>
> > Thanks for opening the FLIP and kicking off this discussion, Xia!
> > The proposed changes make up an important missing part of the dynamic
> > parallelism inference of adaptive batch scheduler.
> >
> > Besides that, it is also one good step towards supporting dynamic
> > parallelism inference for streaming sources, e.g. allowing Kafka
> > sources to determine its parallelism automatically based on the
> > number of partitions.
> >
> > +1 for the proposal.
> >
> > Thanks,
> > Zhu
> >
> > Xia Sun  于2023年10月31日周二 16:01写道:
> >
> > > Hi everyone,
> > > I would like to start a discussion on FLIP-379: Dynamic source
> > parallelism
> > > inference for batch jobs[1].
> > >
> > > In general, there are three main ways to set source parallelism for
> batch
> > > jobs:
> > > (1) User-defined source parallelism.
> > > (2) Connector static parallelism inference.
> > > (3) Dynamic parallelism inference.
> > >
> > > Compared to manually setting parallelism, automatic parallelism
> inference
> > > is easier to use and can better adapt to varying data volumes each day.
> > > However, static parallelism inference cannot leverage runtime
> > information,
> > > resulting in inaccurate parallelism inference. Therefore, for batch
> jobs,
> > > dynamic parallelism inference is the most ideal, but currently, the
> > support
> > > for adaptive batch scheduler is not very comprehensive.
> > >
> > > Therefore, we aim to introduce a general interface that enables the
> > > adaptive batch scheduler to dynamically infer the source parallelism at
> > > runtime. Please refer to the FLIP[1] document for more details about
> the
> > > proposed design and implementation.
> > >
> > > I also thank Zhu Zhu and LiJie Wang for their suggestions during the
> > > pre-discussion.
> > > Looking forward to your feedback and suggestions, thanks.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
> > >
> > > Best regards,
> > > Xia
> > >
> >
>


[DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-10-31 Thread Xia Sun
Hi everyone,
I would like to start a discussion on FLIP-379: Dynamic source parallelism
inference for batch jobs[1].

In general, there are three main ways to set source parallelism for batch
jobs:
(1) User-defined source parallelism.
(2) Connector static parallelism inference.
(3) Dynamic parallelism inference.

Compared to manually setting parallelism, automatic parallelism inference
is easier to use and can better adapt to varying data volumes each day.
However, static parallelism inference cannot leverage runtime information,
resulting in inaccurate parallelism inference. Therefore, for batch jobs,
dynamic parallelism inference is the most ideal, but currently, the support
for adaptive batch scheduler is not very comprehensive.

Therefore, we aim to introduce a general interface that enables the
adaptive batch scheduler to dynamically infer the source parallelism at
runtime. Please refer to the FLIP[1] document for more details about the
proposed design and implementation.

I also thank Zhu Zhu and LiJie Wang for their suggestions during the
pre-discussion.
Looking forward to your feedback and suggestions, thanks.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs

Best regards,
Xia


Re: [VOTE] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-24 Thread Xia Sun
+1 (non-binding)

Best Regards,

Xia

yuxia  于2023年6月25日周日 09:23写道:

> +1 (binding)
> Thanks Lijie driving it.
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Yuepeng Pan" 
> 收件人: "dev" 
> 发送时间: 星期六, 2023年 6 月 24日 下午 9:06:53
> 主题: Re:[VOTE] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs
>
> +1 (non-binding)
>
> Thanks,
> Yuepeng Pan
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2023-06-23 23:49:53, "Lijie Wang"  wrote:
> >Hi all,
> >
> >Thanks for all the feedback about the FLIP-324: Introduce Runtime Filter
> >for Flink Batch Jobs[1]. This FLIP was discussed in [2].
> >
> >I'd like to start a vote for it. The vote will be open for at least 72
> >hours (until June 29th 12:00 GMT) unless there is an objection or
> >insufficient votes.
> >
> >[1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs
> >[2] https://lists.apache.org/thread/mm0o8fv7x7k13z11htt88zhy7lo8npmg
> >
> >Best,
> >Lijie
>


Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-12 Thread Xia Sun
Hi Yuxin,

Thanks for creating this FLIP!
I'm a flink user, and in our internal scenario we use the colocation
technology to run flink jobs and online service on the same machine
together. We found that flink jobs are occasionally affected by other
non-flink jobs (i.e. if the host disk space is full, that will result in
'No space left on device' error on flink jobs). This flip will really help
us to benefit from hybrid shuffle without being worried about insufficient
disk space problem.

And I also have a few questions.
1. If the same subpartition spans multiple different tiers, how to keep the
order of segments between different storage tiers (if necessary)?
2. In the process of writing to the local disk for a subpartition, what
will happen if the disk space is found to be full? Will it report an error
or automatically transfer to remote storage?
3. For remote storage, I noticed that it uses direct reading, which is
different from the other two, does the switching between different tiers
will bring overhead or waiting? In addition, compared to flink rss, which
optimizes data compression and small file merging to improve throughput and
relieve file system pressure, does the object storage system can meet the
performance requirements and concurrent access challenges of large-scale
batch jobs(parallelism > 1)?

Thanks,
Xia

Zhu Zhu  于2023年3月10日周五 16:44写道:

> Hi Yuxin,
>
> Thanks for creating this FLIP!
> The idea of tiered storage looks good. Instead of choosing one from
> multiple storages, it can help to balance between performance, cost and
> stability. It also has the potential to adaptively select proper tiers
> according to more runtime information, to achieve better performance
> and ease of use.
>
> I have a question about the tier finding of data reading. In the FLIP
> it proposes that the Read Client asks each storage tier whether a
> given segment exists in it, from higher priority tiers to lower priority
> ones. I'm a bit concerned about the cost of it, especially when data
> are written to low priority tiers. Do you have any evaluation of it?
> Is it possible to let the Reader Client know the location of the next
> segment when it has finished reading one segment? Or maybe just let it
> know whether the next segment is located in the same tier, if we can
> have the assumption that tier changing would not be very frequent.
>
> Thanks,
> Zhu
>
> Weihua Hu  于2023年3月10日周五 11:52写道:
> >
> > Thanks Yuxin for your explanation.
> >
> > That sounds reasonable. Looking forward to the new shuffle.
> >
> >
> > Best,
> > Weihua
> >
> >
> > On Fri, Mar 10, 2023 at 11:48 AM Yuxin Tan 
> wrote:
> >
> > > Hi, Weihua,
> > > Thanks for the questions and the ideas.
> > >
> > > > 1. How many performance regressions would there be if we only
> > > used remote storage?
> > >
> > > The new architecture can support to use remote storage only, but this
> > > FLIP target is to improve job stability. And the change in the FLIP has
> > > been significantly complex and the goal of the first version is to
> update
> > > Hybrid Shuffle to the new architecture and support remote storage as
> > > a supplement. The performance of this version is not the first
> priority,
> > > so we haven’t tested the performance of using only remote storage.
> > > If there are indeed regressions, we will keep optimizing the
> performance
> > > of the remote storages and improve it until only remote storage is
> > > available in the production environment.
> > >
> > > > 2. Shall we move the local data to remote storage if the producer is
> > > finished for a long time?
> > >
> > > I agree that it is a good idea, which can release task manager
> resources
> > > more timely. But moving data from TM local disk to remote storage needs
> > > more detailed discussion and design, and it is easier to implement it
> based
> > > on the new architecture. Considering the complexity, the target focus,
> and
> > > the iteration cycle of the FLIP, we decide that the details are not
> > > included
> > > in the first version. We will extend and implement them in the
> subsequent
> > > versions.
> > >
> > > Best,
> > > Yuxin
> > >
> > >
> > > Weihua Hu  于2023年3月9日周四 11:22写道:
> > >
> > > > Hi, Yuxin
> > > >
> > > > Thanks for driving this FLIP.
> > > >
> > > > The remote storage shuffle could improve the stability of Batch jobs.
> > > >
> > > > In our internal scenario, we use a hybrid cluster to run both
> > > > Streaming(high priority)
> > > > and Batch jobs(low priority). When there is not enough
> resources(such as
> > > > cpu usage
> > > > reaches a threshold), the batch containers will be evicted. So this
> will
> > > > cause some re-run
> > > > of batch tasks.
> > > >
> > > > It would be a great help if the remote storage could address this.
> So I
> > > > have a few questions.
> > > >
> > > > 1. How many performance regressions would there be if we only used
> remote
> > > > storage?
> > > >
> > > > 2. In current design, the shuffle data segment will