Re: [ANNOUNCE] New Apache Flink PMC Member - Fan Rui
Congratulations, Rui! Best, Xia Paul Lam 于2024年6月6日周四 11:59写道: > Congrats, Rui! > > Best, > Paul Lam > > > 2024年6月6日 11:02,Junrui Lee 写道: > > > > Congratulations, Rui. > > > > Best, > > Junrui > > > > Hang Ruan 于2024年6月6日周四 10:35写道: > > > >> Congratulations, Rui! > >> > >> Best, > >> Hang > >> > >> Samrat Deb 于2024年6月6日周四 10:28写道: > >> > >>> Congratulations Rui > >>> > >>> Bests, > >>> Samrat > >>> > >>> On Thu, 6 Jun 2024 at 7:45 AM, Yuxin Tan > wrote: > >>> > Congratulations, Rui! > > Best, > Yuxin > > > Xuannan Su 于2024年6月6日周四 09:58写道: > > > Congratulations! > > > > Best regards, > > Xuannan > > > > On Thu, Jun 6, 2024 at 9:53 AM Hangxiang Yu > >>> wrote: > >> > >> Congratulations, Rui ! > >> > >> On Thu, Jun 6, 2024 at 9:18 AM Lincoln Lee >>> > > wrote: > >> > >>> Congratulations, Rui! > >>> > >>> Best, > >>> Lincoln Lee > >>> > >>> > >>> Lijie Wang 于2024年6月6日周四 09:11写道: > >>> > Congratulations, Rui! > > Best, > Lijie > > Rodrigo Meneses 于2024年6月5日周三 21:35写道: > > > All the best > > > > On Wed, Jun 5, 2024 at 5:56 AM xiangyu feng < > xiangyu...@gmail.com> > wrote: > > > >> Congratulations, Rui! > >> > >> Regards, > >> Xiangyu Feng > >> > >> Feng Jin 于2024年6月5日周三 20:42写道: > >> > >>> Congratulations, Rui! > >>> > >>> > >>> Best, > >>> Feng Jin > >>> > >>> On Wed, Jun 5, 2024 at 8:23 PM Yanfei Lei < > fredia...@gmail.com > >> > wrote: > >>> > Congratulations, Rui! > > Best, > Yanfei > > Luke Chen 于2024年6月5日周三 20:08写道: > > > > Congrats, Rui! > > > > Luke > > > > On Wed, Jun 5, 2024 at 8:02 PM Jiabao Sun < > >>> jiabao...@apache.org> > >>> wrote: > > > >> Congrats, Rui. Well-deserved! > >> > >> Best, > >> Jiabao > >> > >> Zhanghao Chen > >>> 于2024年6月5日周三 > 19:29写道: > >> > >>> Congrats, Rui! > >>> > >>> Best, > >>> Zhanghao Chen > >>> > >>> From: Piotr Nowojski > >>> Sent: Wednesday, June 5, 2024 18:01 > >>> To: dev ; rui fan < > 1996fan...@gmail.com> > >>> Subject: [ANNOUNCE] New Apache Flink PMC Member - > >>> Fan > > Rui > >>> > >>> Hi everyone, > >>> > >>> On behalf of the PMC, I'm very happy to announce > > another > >>> new > >> Apache > Flink > >>> PMC Member - Fan Rui. > >>> > >>> Rui has been active in the community since August > 2019. > During > >> this > time > >> he > >>> has contributed a lot of new features. Among > >>> others: > >>> - Decoupling Autoscaler from Kubernetes > >> Operator, > and > >> supporting > >>> Standalone Autoscaler > >>> - Improvements to checkpointing, flamegraphs, > restart > >> strategies, > >>> watermark alignment, network shuffles > >>> - Optimizing the memory and CPU usage of large > > operators, > >> greatly > >>> reducing the risk and probability of TaskManager > >>> OOM > >>> > >>> He reviewed a significant amount of PRs and has > >>> been > > active > > both > >> on > the > >>> mailing lists and in Jira helping to both > >> maintain > and > > grow > >> Apache > >> Flink's > >>> community. He is also our current Flink 1.20 > >>> release > >>> manager. > >>> > >>> In the last 12 months, Rui has been the most > >> active > contributor > >> in > the > >>> Flink Kubernetes Operator project, while being > >> the > 2nd > > most > >> active > Flink > >>> contributor at the same time. > >>> > >>> Please join me in welcoming and congratulating > >> Fan > Rui! > >>> > >>> Best, > >>> Piotrek (on behalf of the Flink PMC) > >>> > >> > > >>> > >> > > > > >>> > >> > >> > >> -- > >> Best, > >> Hangxiang. > > > > >>> > >> > >
[RESULT][VOTE] FLIP-445: Support dynamic parallelism inference for HiveSource
Hi all, FLIP-445: Support dynamic parallelism inference for HiveSource[1] has been accepted and voted through this thread [2]. The proposal has been accepted with 6 approving votes (5 binding) and there is no disapproval: - Muhammet Orazov (non-binding) - Rui Fan (binding) - Ron Liu (binding) - Zhu Zhu (binding) - Lijie Wang (binding) - yuxia (binding) Thanks to all involved. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-445%3A+Support+dynamic+parallelism+inference+for+HiveSource [2] https://lists.apache.org/thread/lktnb162l2z3042m76to6xfbsdndy4r7 Best, Xia
[VOTE] FLIP-445: Support dynamic parallelism inference for HiveSource
Hi everyone, I'd like to start a vote on FLIP-445: Support dynamic parallelism inference for HiveSource[1] which has been discussed in this thread [2]. The vote will be open for at least 72 hours unless there is an objection or not enough votes. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-445%3A+Support+dynamic+parallelism+inference+for+HiveSource [2] https://lists.apache.org/thread/4k1qx6lodhbkknkhjyl0lq9bx8fcpjvn Best, Xia
Re: [DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource
Hi Venkat, Thanks for joining the discussion. Based on our understanding, there are still a significant number of existing tasks using Hive. Indeed, many companies are now migrating their data to the lakehouse, but due to historical reasons, a substantial amount of data still resides in Hive. Best, Xia Venkatakrishnan Sowrirajan 于2024年4月25日周四 11:52写道: > Hi Xia, > > +1 on introducing dynamic parallelism inference for HiveSource. > > Orthogonal to this discussion, curious, how commonly HiveSource is used > these days in the industry given the popularity of table formats/sources > like Iceberg, Hudi and Delta lake? > > Thanks > Venkat > > On Wed, Apr 24, 2024, 7:41 PM Xia Sun wrote: > > > Hi everyone, > > > > Thanks for all the feedback! > > > > If there are no more comments, I would like to start the vote thread, > > thanks again! > > > > Best, > > Xia > > > > Ahmed Hamdy 于2024年4月18日周四 21:31写道: > > > > > Hi Xia, > > > I have read through the FLIP and discussion and the new version of the > > FLIP > > > looks better. > > > +1 for the proposal. > > > Best Regards > > > Ahmed Hamdy > > > > > > > > > On Thu, 18 Apr 2024 at 12:21, Ron Liu wrote: > > > > > > > Hi, Xia > > > > > > > > Thanks for updating, looks good to me. > > > > > > > > Best, > > > > Ron > > > > > > > > Xia Sun 于2024年4月18日周四 19:11写道: > > > > > > > > > Hi Ron, > > > > > Yes, presenting it in a table might be more intuitive. I have > already > > > > added > > > > > the table in the "Public Interfaces | New Config Option" chapter of > > > FLIP. > > > > > PTAL~ > > > > > > > > > > Ron Liu 于2024年4月18日周四 18:10写道: > > > > > > > > > > > Hi, Xia > > > > > > > > > > > > Thanks for your reply. > > > > > > > > > > > > > That means, in terms > > > > > > of priority, `table.exec.hive.infer-source-parallelism` > > > > > > > `table.exec.hive.infer-source-parallelism.mode`. > > > > > > > > > > > > I still have some confusion, if the > > > > > > `table.exec.hive.infer-source-parallelism` > > > > > > >`table.exec.hive.infer-source-parallelism.mode`, currently > > > > > > `table.exec.hive.infer-source-parallelism` default value is true, > > > that > > > > > > means always static parallelism inference work? Or perhaps after > > this > > > > > FLIP, > > > > > > we changed the default behavior of > > > > > > `table.exec.hive.infer-source-parallelism` to indicate dynamic > > > > > parallelism > > > > > > inference when enabled. > > > > > > I think you should list the various behaviors of these two > options > > > that > > > > > > coexist in FLIP by a table, only then users can know how the > > dynamic > > > > and > > > > > > static parallelism inference work. > > > > > > > > > > > > Best, > > > > > > Ron > > > > > > > > > > > > Xia Sun 于2024年4月18日周四 16:33写道: > > > > > > > > > > > > > Hi Ron and Lijie, > > > > > > > Thanks for joining the discussion and sharing your suggestions. > > > > > > > > > > > > > > > the InferMode class should also be introduced in the Public > > > > > Interfaces > > > > > > > > section! > > > > > > > > > > > > > > > > > > > > > Thanks for the reminder, I have now added the InferMode class > to > > > the > > > > > > Public > > > > > > > Interfaces section as well. > > > > > > > > > > > > > > > `table.exec.hive.infer-source-parallelism.max` is 1024, I > > checked > > > > > > through > > > > > > > > the code that the default value is 1000? > > > > > > > > > > > > > > > > > > > > > I have checked and the default value of > > > > > > > `table.exec.hive.infer-source-parallelism.max` is indeed 1000. > > This > > > >
Re: [DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource
Hi everyone, Thanks for all the feedback! If there are no more comments, I would like to start the vote thread, thanks again! Best, Xia Ahmed Hamdy 于2024年4月18日周四 21:31写道: > Hi Xia, > I have read through the FLIP and discussion and the new version of the FLIP > looks better. > +1 for the proposal. > Best Regards > Ahmed Hamdy > > > On Thu, 18 Apr 2024 at 12:21, Ron Liu wrote: > > > Hi, Xia > > > > Thanks for updating, looks good to me. > > > > Best, > > Ron > > > > Xia Sun 于2024年4月18日周四 19:11写道: > > > > > Hi Ron, > > > Yes, presenting it in a table might be more intuitive. I have already > > added > > > the table in the "Public Interfaces | New Config Option" chapter of > FLIP. > > > PTAL~ > > > > > > Ron Liu 于2024年4月18日周四 18:10写道: > > > > > > > Hi, Xia > > > > > > > > Thanks for your reply. > > > > > > > > > That means, in terms > > > > of priority, `table.exec.hive.infer-source-parallelism` > > > > > `table.exec.hive.infer-source-parallelism.mode`. > > > > > > > > I still have some confusion, if the > > > > `table.exec.hive.infer-source-parallelism` > > > > >`table.exec.hive.infer-source-parallelism.mode`, currently > > > > `table.exec.hive.infer-source-parallelism` default value is true, > that > > > > means always static parallelism inference work? Or perhaps after this > > > FLIP, > > > > we changed the default behavior of > > > > `table.exec.hive.infer-source-parallelism` to indicate dynamic > > > parallelism > > > > inference when enabled. > > > > I think you should list the various behaviors of these two options > that > > > > coexist in FLIP by a table, only then users can know how the dynamic > > and > > > > static parallelism inference work. > > > > > > > > Best, > > > > Ron > > > > > > > > Xia Sun 于2024年4月18日周四 16:33写道: > > > > > > > > > Hi Ron and Lijie, > > > > > Thanks for joining the discussion and sharing your suggestions. > > > > > > > > > > > the InferMode class should also be introduced in the Public > > > Interfaces > > > > > > section! > > > > > > > > > > > > > > > Thanks for the reminder, I have now added the InferMode class to > the > > > > Public > > > > > Interfaces section as well. > > > > > > > > > > > `table.exec.hive.infer-source-parallelism.max` is 1024, I checked > > > > through > > > > > > the code that the default value is 1000? > > > > > > > > > > > > > > > I have checked and the default value of > > > > > `table.exec.hive.infer-source-parallelism.max` is indeed 1000. This > > has > > > > > been corrected in the FLIP. > > > > > > > > > > > how are`table.exec.hive.infer-source-parallelism` and > > > > > > `table.exec.hive.infer-source-parallelism.mode` compatible? > > > > > > > > > > > > > > > This is indeed a critical point. The current plan is to deprecate > > > > > `table.exec.hive.infer-source-parallelism` but still utilize it as > > the > > > > main > > > > > switch for enabling automatic parallelism inference. That means, in > > > terms > > > > > of priority, `table.exec.hive.infer-source-parallelism` > > > > > > `table.exec.hive.infer-source-parallelism.mode`. In future > versions, > > if > > > > > `table.exec.hive.infer-source-parallelism` is removed, this logic > > will > > > > also > > > > > need to be revised, leaving only > > > > > `table.exec.hive.infer-source-parallelism.mode` as the basis for > > > deciding > > > > > whether to enable parallelism inference. I have also added this > > > > description > > > > > to the FLIP. > > > > > > > > > > > > > > > > In FLIP-367 it is supported to be able to set the Source's > > > parallelism > > > > > > individually, if in the future HiveSource also supports this > > feature, > > > > > > however, the default value of > > > > > > `table.exec.hive.infer-source-parallelism.mode` is > >
Re: [DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource
Hi Ron, Yes, presenting it in a table might be more intuitive. I have already added the table in the "Public Interfaces | New Config Option" chapter of FLIP. PTAL~ Ron Liu 于2024年4月18日周四 18:10写道: > Hi, Xia > > Thanks for your reply. > > > That means, in terms > of priority, `table.exec.hive.infer-source-parallelism` > > `table.exec.hive.infer-source-parallelism.mode`. > > I still have some confusion, if the > `table.exec.hive.infer-source-parallelism` > >`table.exec.hive.infer-source-parallelism.mode`, currently > `table.exec.hive.infer-source-parallelism` default value is true, that > means always static parallelism inference work? Or perhaps after this FLIP, > we changed the default behavior of > `table.exec.hive.infer-source-parallelism` to indicate dynamic parallelism > inference when enabled. > I think you should list the various behaviors of these two options that > coexist in FLIP by a table, only then users can know how the dynamic and > static parallelism inference work. > > Best, > Ron > > Xia Sun 于2024年4月18日周四 16:33写道: > > > Hi Ron and Lijie, > > Thanks for joining the discussion and sharing your suggestions. > > > > > the InferMode class should also be introduced in the Public Interfaces > > > section! > > > > > > Thanks for the reminder, I have now added the InferMode class to the > Public > > Interfaces section as well. > > > > > `table.exec.hive.infer-source-parallelism.max` is 1024, I checked > through > > > the code that the default value is 1000? > > > > > > I have checked and the default value of > > `table.exec.hive.infer-source-parallelism.max` is indeed 1000. This has > > been corrected in the FLIP. > > > > > how are`table.exec.hive.infer-source-parallelism` and > > > `table.exec.hive.infer-source-parallelism.mode` compatible? > > > > > > This is indeed a critical point. The current plan is to deprecate > > `table.exec.hive.infer-source-parallelism` but still utilize it as the > main > > switch for enabling automatic parallelism inference. That means, in terms > > of priority, `table.exec.hive.infer-source-parallelism` > > > `table.exec.hive.infer-source-parallelism.mode`. In future versions, if > > `table.exec.hive.infer-source-parallelism` is removed, this logic will > also > > need to be revised, leaving only > > `table.exec.hive.infer-source-parallelism.mode` as the basis for deciding > > whether to enable parallelism inference. I have also added this > description > > to the FLIP. > > > > > > > In FLIP-367 it is supported to be able to set the Source's parallelism > > > individually, if in the future HiveSource also supports this feature, > > > however, the default value of > > > `table.exec.hive.infer-source-parallelism.mode` is `InferMode.DYNAMIC`, > > at > > > this point will the parallelism be dynamically derived or will the > > manually > > > set parallelism take effect, and who has the higher priority? > > > > > > From my understanding, 'manually set parallelism' has the higher > priority, > > just like one of the preconditions for the effectiveness of dynamic > > parallelism inference in the AdaptiveBatchScheduler is that the vertex's > > parallelism isn't set. I believe whether it's static inference or dynamic > > inference, the manually set parallelism by the user should be respected. > > > > > The `InferMode.NONE` option. > > > > Currently, 'adding InferMode.NONE' seems to be the prevailing opinion. I > > will add InferMode.NONE as one of the Enum options in InferMode class. > > > > Best, > > Xia > > > > Lijie Wang 于2024年4月18日周四 13:50写道: > > > > > Thanks for driving the discussion. > > > > > > +1 for the proposal and +1 for the `InferMode.NONE` option. > > > > > > Best, > > > Lijie > > > > > > Ron liu 于2024年4月18日周四 11:36写道: > > > > > > > Hi, Xia > > > > > > > > Thanks for driving this FLIP. > > > > > > > > This proposal looks good to me overall. However, I have the following > > > minor > > > > questions: > > > > > > > > 1. FLIP introduced `table.exec.hive.infer-source-parallelism.mode` > as a > > > new > > > > parameter, and the value is the enum class `InferMode`, I think the > > > > InferMode class should also be introduced in the Public Interfaces > > > section! > > > > 2. You mentioned in FLIP that the default valu
Re: [DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource
Hi Ron and Lijie, Thanks for joining the discussion and sharing your suggestions. > the InferMode class should also be introduced in the Public Interfaces > section! Thanks for the reminder, I have now added the InferMode class to the Public Interfaces section as well. > `table.exec.hive.infer-source-parallelism.max` is 1024, I checked through > the code that the default value is 1000? I have checked and the default value of `table.exec.hive.infer-source-parallelism.max` is indeed 1000. This has been corrected in the FLIP. > how are`table.exec.hive.infer-source-parallelism` and > `table.exec.hive.infer-source-parallelism.mode` compatible? This is indeed a critical point. The current plan is to deprecate `table.exec.hive.infer-source-parallelism` but still utilize it as the main switch for enabling automatic parallelism inference. That means, in terms of priority, `table.exec.hive.infer-source-parallelism` > `table.exec.hive.infer-source-parallelism.mode`. In future versions, if `table.exec.hive.infer-source-parallelism` is removed, this logic will also need to be revised, leaving only `table.exec.hive.infer-source-parallelism.mode` as the basis for deciding whether to enable parallelism inference. I have also added this description to the FLIP. > In FLIP-367 it is supported to be able to set the Source's parallelism > individually, if in the future HiveSource also supports this feature, > however, the default value of > `table.exec.hive.infer-source-parallelism.mode` is `InferMode.DYNAMIC`, at > this point will the parallelism be dynamically derived or will the manually > set parallelism take effect, and who has the higher priority? >From my understanding, 'manually set parallelism' has the higher priority, just like one of the preconditions for the effectiveness of dynamic parallelism inference in the AdaptiveBatchScheduler is that the vertex's parallelism isn't set. I believe whether it's static inference or dynamic inference, the manually set parallelism by the user should be respected. > The `InferMode.NONE` option. Currently, 'adding InferMode.NONE' seems to be the prevailing opinion. I will add InferMode.NONE as one of the Enum options in InferMode class. Best, Xia Lijie Wang 于2024年4月18日周四 13:50写道: > Thanks for driving the discussion. > > +1 for the proposal and +1 for the `InferMode.NONE` option. > > Best, > Lijie > > Ron liu 于2024年4月18日周四 11:36写道: > > > Hi, Xia > > > > Thanks for driving this FLIP. > > > > This proposal looks good to me overall. However, I have the following > minor > > questions: > > > > 1. FLIP introduced `table.exec.hive.infer-source-parallelism.mode` as a > new > > parameter, and the value is the enum class `InferMode`, I think the > > InferMode class should also be introduced in the Public Interfaces > section! > > 2. You mentioned in FLIP that the default value of > > `table.exec.hive.infer-source-parallelism.max` is 1024, I checked through > > the code that the default value is 1000? > > 3. I also agree with Muhammet's idea that there is no need to introduce > the > > option `table.exec.hive.infer-source-parallelism.enabled`, and that > > expanding the InferMode values will fulfill the need. There is another > > issue to consider here though, how are > > `table.exec.hive.infer-source-parallelism` and > > `table.exec.hive.infer-source-parallelism.mode` compatible? > > 4. In FLIP-367 it is supported to be able to set the Source's parallelism > > individually, if in the future HiveSource also supports this feature, > > however, the default value of > > `table.exec.hive.infer-source-parallelism.mode` is `InferMode. DYNAMIC`, > at > > this point will the parallelism be dynamically derived or will the > manually > > set parallelism take effect, and who has the higher priority? > > > > Best, > > Ron > > > > Xia Sun 于2024年4月17日周三 12:08写道: > > > > > Hi Jeyhun, Muhammet, > > > Thanks for all the feedback! > > > > > > > Could you please mention the default values for the new > configurations > > > > (e.g., table.exec.hive.infer-source-parallelism.mode, > > > > table.exec.hive.infer-source-parallelism.enabled, > > > > etc) ? > > > > > > > > > Thanks for your suggestion. I have supplemented the explanation > regarding > > > the default values. > > > > > > > Since we are introducing the mode as a configuration option, > > > > could it make sense to have `InferMode.NONE` option also? > > > > The `NONE` option would disable the inference. > > > > > > > > > This is a good idea. Looking ahead, i
Re: [DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource
Hi Jeyhun, Muhammet, Thanks for all the feedback! > Could you please mention the default values for the new configurations > (e.g., table.exec.hive.infer-source-parallelism.mode, > table.exec.hive.infer-source-parallelism.enabled, > etc) ? Thanks for your suggestion. I have supplemented the explanation regarding the default values. > Since we are introducing the mode as a configuration option, > could it make sense to have `InferMode.NONE` option also? > The `NONE` option would disable the inference. This is a good idea. Looking ahead, it could eliminate the need for introducing a new configuration option. I haven't identified any potential compatibility issues as yet. If there are no further ideas from others, I'll go ahead and update the FLIP to introducing InferMode.NONE. Best, Xia Muhammet Orazov 于2024年4月17日周三 10:31写道: > Hello Xia, > > Thanks for the FLIP! > > Since we are introducing the mode as a configuration option, > could it make sense to have `InferMode.NONE` option also? > The `NONE` option would disable the inference. > > This way we deprecate the `table.exec.hive.infer-source-parallelism` > and no additional `table.exec.hive.infer-source-parallelism.enabled` > option is required. > > What do you think? > > Best, > Muhammet > > On 2024-04-16 07:07, Xia Sun wrote: > > Hi everyone, > > I would like to start a discussion on FLIP-445: Support dynamic > > parallelism > > inference for HiveSource[1]. > > > > FLIP-379[2] has introduced dynamic source parallelism inference for > > batch > > jobs, which can utilize runtime information to more accurately decide > > the > > source parallelism. As a follow-up task, we plan to implement the > > dynamic > > parallelism inference interface for HiveSource, and also switch the > > default > > static parallelism inference to dynamic parallelism inference. > > > > Looking forward to your feedback and suggestions, thanks. > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-445%3A+Support+dynamic+parallelism+inference+for+HiveSource > > [2] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs > > > > Best regards, > > Xia >
[DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource
Hi everyone, I would like to start a discussion on FLIP-445: Support dynamic parallelism inference for HiveSource[1]. FLIP-379[2] has introduced dynamic source parallelism inference for batch jobs, which can utilize runtime information to more accurately decide the source parallelism. As a follow-up task, we plan to implement the dynamic parallelism inference interface for HiveSource, and also switch the default static parallelism inference to dynamic parallelism inference. Looking forward to your feedback and suggestions, thanks. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-445%3A+Support+dynamic+parallelism+inference+for+HiveSource [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs Best regards, Xia
Re: [ANNOUNCE] New Apache Flink Committer - Zakelly Lan
Congratulations Zakelly! Best, Xia Leonard Xu 于2024年4月15日周一 16:16写道: > Congratulations Zakelly! > > > Best, > Leonard > > 2024年4月15日 下午3:56,Samrat Deb 写道: > > > > Congratulations Zakelly! > >
Re: Question around Flink's AdaptiveBatchScheduler
Hi Venkat, I agree that the parallelism of source vertex should not be upper bounded by the job's global max parallelism. The case you mentioned, >> High filter selectivity with huge amounts of data to read excellently supports this viewpoint. (In fact, in the current implementation, if the source parallelism is pre-specified at job create stage, rather than relying on the dynamic parallelism inference of the AdaptiveBatchScheduler, the source vertex's parallelism can indeed exceed the job's global max parallelism.) As Lijie and Junrui pointed out, the key issue is "semantic consistency." Currently, if a vertex has not set maxParallelism, the AdaptiveBatchScheduler will use `execution.batch.adaptive.auto-parallelism.max-parallelism` as the vertex's maxParallelism. Since the current implementation does not distinguish between source vertices and downstream vertices, source vertices are also subject to this limitation. Therefore, I believe that if the issue of "semantic consistency" can be well explained in the code and configuration documentation, the AdaptiveBatchScheduler should support that the parallelism of source vertices can exceed the job's global max parallelism. Best, Xia Venkatakrishnan Sowrirajan 于2024年4月14日周日 10:31写道: > Let me state why I think "*jobmanager.adaptive-batch-sche* > *duler.default-source-parallelism*" should not be bound by the " > *jobmanager.adaptive-batch-sche**duler.max-parallelism*". > >- Source vertex is unique and does not have any upstream vertices >- Downstream vertices read shuffled data partitioned by key, which is >not the case for the Source vertex >- Limiting source parallelism by downstream vertices' max parallelism is >incorrect > > If we say for ""semantic consistency" the source vertex parallelism has to > be bound by the overall job's max parallelism, it can lead to following > issues: > >- High filter selectivity with huge amounts of data to read - setting >high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so that >source parallelism can be set higher can lead to small blocks and >sub-optimal performance. >- Setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" >requires careful tuning of network buffer configurations which is >unnecessary in cases where it is not required just so that the source >parallelism can be set high. > > Regards > Venkata krishnan > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee wrote: > > > Hello Venkata krishnan, > > > > I think the term "semantic inconsistency" defined by > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to > maintaining a > > uniform upper limit on parallelism across all vertices within a job. As > the > > source vertices are part of the global execution graph, they should also > > respect this rule to ensure consistent application of parallelism > > constraints. > > > > Best, > > Junrui > > > > Venkatakrishnan Sowrirajan 于2024年4月12日周五 02:10写道: > > > > > Gentle bump on this question. cc @Becket Qin as > > > well. > > > > > > Regards > > > Venkata krishnan > > > > > > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan < > > > vsowr...@asu.edu> wrote: > > > > > > > Thanks for the response Lijie and Junrui. Sorry for the late reply. > Few > > > > follow up questions. > > > > > > > > > Source can actually ignore this limit > > > > because it has no upstream, but this will lead to semantic > > inconsistency. > > > > > > > > Lijie, can you please elaborate on the above comment further? What do > > you > > > > mean when you say it will lead to "semantic inconsistency"? > > > > > > > > > Secondly, we first need to limit the max parallelism of > (downstream) > > > > vertex, and then we can decide how many subpartitions (upstream > vertex) > > > > should produce. The limit should be effective, otherwise some > > downstream > > > > tasks will have no data to process. > > > > > > > > This makes sense in the context of any other vertices other than the > > > > source vertex. As you mentioned above ("Source can actually ignore > this > > > > limit because it has no upstream"), therefore I feel " > > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism" need > > not > > > > be upper bounded by > > > "jobmanager.adaptive-batch-scheduler.max-parallelism". > > > > > > > > Regards > > > > Venkata krishnan > > > > > > > > > > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee > > wrote: > > > > > > > >> Hi Venkat, > > > >> > > > >> As Lijie mentioned, in Flink, the parallelism is required to be > less > > > than > > > >> or equal to the maximum parallelism. The config option > > > >> jobmanager.adaptive-batch-scheduler.max-parallelism and > > > >> jobmanager.adaptive-batch-scheduler.default-source-parallelism will > be > > > set > > > >> as the source's parallelism and max-parallelism, respectively. > > > Therefore, > > > >> the check failed situation you encountered is in line with the > > > >> expectations.
Re: [ANNOUNCE] New Apache Flink PMC Member - Jing Ge
Congratulations, Jing! Best, Xia Ferenc Csaky 于2024年4月13日周六 00:50写道: > Congratulations, Jing! > > Best, > Ferenc > > > > On Friday, April 12th, 2024 at 13:54, Ron liu wrote: > > > > > > > Congratulations, Jing! > > > > Best, > > Ron > > > > Junrui Lee jrlee@gmail.com 于2024年4月12日周五 18:54写道: > > > > > Congratulations, Jing! > > > > > > Best, > > > Junrui > > > > > > Aleksandr Pilipenko z3d...@gmail.com 于2024年4月12日周五 18:28写道: > > > > > > > Congratulations, Jing! > > > > > > > > Best Regards, > > > > Aleksandr >
Re: [ANNOUNCE] New Apache Flink PMC Member - Lincoln Lee
Congratulations Lincoln ! Best, Xia Ferenc Csaky 于2024年4月13日周六 00:50写道: > Congratulations, Lincoln! > > Best, > Ferenc > > > > > On Friday, April 12th, 2024 at 15:54, lorenzo.affe...@ververica.com.INVALID > wrote: > > > > > > > Huge congrats! Well done! > > On Apr 12, 2024 at 13:56 +0200, Ron liu ron9@gmail.com, wrote: > > > > > Congratulations, Lincoln! > > > > > > Best, > > > Ron > > > > > > Junrui Lee jrlee@gmail.com 于2024年4月12日周五 18:54写道: > > > > > > > Congratulations, Lincoln! > > > > > > > > Best, > > > > Junrui > > > > > > > > Aleksandr Pilipenko z3d...@gmail.com 于2024年4月12日周五 18:29写道: > > > > > > > > > > Congratulations, Lincoln! > > > > > > > > > > > > Best Regards > > > > > > Aleksandr >
[RESULT][VOTE] FLIP-379: Dynamic source parallelism inference for batch jobs
Dear developers, FLIP-379: Dynamic source parallelism inference for batch jobs[1] has been accepted and voted through this thread [2]. The proposal received 6 approving binding votes and there is no disapproval: - Zhu Zhu (binding) - Lijie Wang (binding) - Rui Fan (binding) - Etienne Chauchot (binding) - Leonard Xu (binding) - Jingsong Li (binding) Thanks to all involved. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs [2] https://lists.apache.org/thread/g03m2r8dodz6gn8jgf36mvq60h1tsnqg Best, Xia
[VOTE] FLIP-379: Dynamic source parallelism inference for batch jobs
Hi everyone, I'd like to start a vote on FLIP-379: Dynamic source parallelism inference for batch jobs[1] which has been discussed in this thread [2]. The vote will be open for at least 72 hours unless there is an objection or not enough votes. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs [2] https://lists.apache.org/thread/ocftkqy5d2x4n58wzprgm5qqrzzkbmb8 Best Regards, Xia
Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs
Hi everyone, Thanks for all the comments! I will initiate the vote tomorrow if there is no further discussion. Best, Xia Leonard Xu 于2023年11月24日周五 18:50写道: > Thanks Xia and Zhu Zhu for driving this work, > > It will help unify the parallelism inference for all operators of batch > job, the updated FLIP looks good to me. > > Best, > Leonard > > > > 2023年11月24日 下午5:53,Xia Sun 写道: > > > > Hi all, > > Offline discussed with Zhu Zhu and Leonard Xu and we have reached the > > following three points of consensus: > > > > 1. Rename the interface method Context#getMaxSourceParallelism proposed > by > > the FLIP to Context#getParallelismInferenceUpperBound, to make the > meaning > > of the method clearer. See [1] for details. > > > > 2. We provide a more detailed explanation of the effective priority of > the > > dynamic source parallelism inference proposed by this FLIP and the order > of > > values for the upper bound of source parallelism. We also point out the > > current support and limitations of the AdaptiveBatchScheduler regarding > > source parallelism inference. See [2] for details. > > > > 3. This FLIP will only focus on the framework-level implementation and > will > > prioritize the implementation of FileSource as an example of the new > > interface proposed by the FLIP. The HiveSource, due to its existing > static > > parallelism dynamic inference, and changes in default values for > > configuration items such as `table.exec.hive.infer-source-parallelism`, > > requires a more detailed migration plan, as well as more comprehensive > > design and discussion. It is not suitable as part of this FLIP and needs > a > > separate FLIP. Therefore, we have removed the HiveSource part from this > > FLIP. > > > > Thanks again to everyone who participated in the discussion. > > Looking forward to your continued feedback. > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-IntroduceDynamicParallelismInferenceinterfaceforSource > > [2] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-ConfigurationBehaviorChanges > > > > Best, > > Xia > > > > Leonard Xu 于2023年11月22日周三 18:37写道: > > > >> Thanks Xia for the reply, sorry for the late reply. > >> > >>> Thanks for pointing out the issue, the current wording does indeed seem > >> to > >>> be confusing. It involves the existing implementation of the > >>> AdaptiveBatchScheduler, where the dynamically inferred parallelism > cannot > >>> exceed the JobVertex's maxParallelism (which is typically set to either > >> the > >>> global default max parallelism or the user-specified JobVertex max > >>> parallelism), so the flip maintains the logic. I have modified the flip > >> to > >>> avoid confusion as much as possible. > >> > >> I didn’t see the change part in this FLIP, could you check it? > >> > >>> We can use Configuration::getOptional to check if the user has > configured > >>> the > >> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`. > >> > >> Using Readable#getOptional(ConfigOption option) makes sense to me. > >> > >>> As a follow-up task, we may have a dedicated discussion in the future > to > >>> see if we need to change the default value of > >>> `table.exec.hive.infer-source-parallelism` to false. Before then, user > >> can > >>> manually set `table.exec.hive.infer-source-parallelism` to false to > >> enable > >>> dynamic parallelism inference, and use > >>> `execution.batch.adaptive.auto-parallelism.default-source-parallelism` > to > >>> replace `table.exec.hive.infer-source-parallelism.max` as the > parallelism > >>> inference upper bound. I have updated both the Flip's > >>> DynamicParallelismInference interface implementation and Migration Plan > >>> modules to illustrate this. > >> > >> In my opinion, moving HiveSource to subsequent discussion is not OK, see > >> my explanation: HiveSource supports dynamic source parallel inference is > >> one part of the FLIP implementation, it looks like that we introduce a > >> configuration > >> `execution.batch.adaptive.auto-paralleli
Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs
ference is a useful feature for batch > >>>> story. I’ve some comments about current design. > >>>> > >>>> 1.How user disable the parallelism inference if they want to use fixed > >>>> source parallelism? They can configure fixed parallelism in table > layer > >>>> currently as you explained above. > >>>> > >>>> 2.Could you explain the priority the static parallelism set from table > >>>> layer and the proposed dynamic source parallelism? And changing the > >> default > >>>> value `table.exec.hive.infer-source-parallelism` as a sub-task does > not > >>>> resolve all case, because other Sources can set their own parallelism > >> too. > >>>> > >>>> 3.Current design only works for batch josb, the workflow for streaming > >> job > >>>> may looks like (1) inference parallelism for streaming source like > >> kafka > >>>> (2) stop job with a savepoint (3) apply new parallelism for job (4) > >>>> schedule the streaming job from savepoint which is totally different, > >> the > >>>> later one lacks a lot of infra in Flink, right? So, could we consider > >> the > >>>> boundness info when design the interface? Both FileSource and Hive > >> Source > >>>> offer streaming read ability, imaging this case: Flink Streaming Hive > >>>> Source should not apply the dynamic source parallelism even it > >> implemented > >>>> the feature as it severing a streaming job. > >>>> > >>>> Best, > >>>> Leonard > >>>> > >>>> > >>>>> 2023年11月1日 下午6:21,Xia Sun 写道: > >>>>> > >>>>> Thanks Lijie for the comments! > >>>>> 1. For Hive source, dynamic parallelism inference in batch scenarios > >> is a > >>>>> superset of static parallelism inference. As a follow-up task, we can > >>>>> consider changing the default value of > >>>>> 'table.exec.hive.infer-source-parallelism' to false. > >>>>> > >>>>> 2. I think that both dynamic parallelism inference and static > >> parallelism > >>>>> inference have their own use cases. Currently, for streaming sources > >> and > >>>>> other sources that are not sensitive to dynamic information, the > >> benefits > >>>>> of dynamic parallelism inference may not be significant. In such > cases, > >>>> we > >>>>> can continue to use static parallelism inference. > >>>>> > >>>>> Thanks, > >>>>> Xia > >>>>> > >>>>> Lijie Wang 于2023年11月1日周三 14:52写道: > >>>>> > >>>>>> Hi Xia, > >>>>>> > >>>>>> Thanks for driving this FLIP, +1 for the proposal. > >>>>>> > >>>>>> I have 2 questions about the relationship between static inference > and > >>>>>> dynamic inference: > >>>>>> > >>>>>> 1. AFAIK, currently the hive table source enable static inference by > >>>>>> default. In this case, which one (static vs dynamic) will take > effect > >> ? > >>>> I > >>>>>> think it would be better if we can point this out in FLIP > >>>>>> > >>>>>> 2. As you mentioned above, dynamic inference is the most ideal way, > so > >>>> do > >>>>>> we have plan to deprecate the static inference in the future? > >>>>>> > >>>>>> Best, > >>>>>> Lijie > >>>>>> > >>>>>> Zhu Zhu 于2023年10月31日周二 20:19写道: > >>>>>> > >>>>>>> Thanks for opening the FLIP and kicking off this discussion, Xia! > >>>>>>> The proposed changes make up an important missing part of the > dynamic > >>>>>>> parallelism inference of adaptive batch scheduler. > >>>>>>> > >>>>>>> Besides that, it is also one good step towards supporting dynamic > >>>>>>> parallelism inference for streaming sources, e.g. allowing Kafka > >>>>>>> sources to determine its parallelism automatically based on the > >>>>>>> number of partitions. > >>>>>>> > >>>>>>> +1 for the proposal. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Zhu > >>>>>>> > >>>>>>> Xia Sun 于2023年10月31日周二 16:01写道: > >>>>>>> > >>>>>>>> Hi everyone, > >>>>>>>> I would like to start a discussion on FLIP-379: Dynamic source > >>>>>>> parallelism > >>>>>>>> inference for batch jobs[1]. > >>>>>>>> > >>>>>>>> In general, there are three main ways to set source parallelism > for > >>>>>> batch > >>>>>>>> jobs: > >>>>>>>> (1) User-defined source parallelism. > >>>>>>>> (2) Connector static parallelism inference. > >>>>>>>> (3) Dynamic parallelism inference. > >>>>>>>> > >>>>>>>> Compared to manually setting parallelism, automatic parallelism > >>>>>> inference > >>>>>>>> is easier to use and can better adapt to varying data volumes each > >>>> day. > >>>>>>>> However, static parallelism inference cannot leverage runtime > >>>>>>> information, > >>>>>>>> resulting in inaccurate parallelism inference. Therefore, for > batch > >>>>>> jobs, > >>>>>>>> dynamic parallelism inference is the most ideal, but currently, > the > >>>>>>> support > >>>>>>>> for adaptive batch scheduler is not very comprehensive. > >>>>>>>> > >>>>>>>> Therefore, we aim to introduce a general interface that enables > the > >>>>>>>> adaptive batch scheduler to dynamically infer the source > parallelism > >>>> at > >>>>>>>> runtime. Please refer to the FLIP[1] document for more details > about > >>>>>> the > >>>>>>>> proposed design and implementation. > >>>>>>>> > >>>>>>>> I also thank Zhu Zhu and LiJie Wang for their suggestions during > the > >>>>>>>> pre-discussion. > >>>>>>>> Looking forward to your feedback and suggestions, thanks. > >>>>>>>> > >>>>>>>> [1] > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs > >>>>>>>> > >>>>>>>> Best regards, > >>>>>>>> Xia > >>>>>>>> > >>>>>>> > >>>>>> > >>>> > >>>> > >> > >> > >
Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs
; > the Source::getBoundedness() method can also be obtained when inferring > > parallelism. > > +1 about the boundedness info part. > > Okay, let’s come back the batch world and discuss some details about > current design: > > (1) About max source parallelism > The FLIP said: (1) Max source parallelism, which is calculated as the > minimum of the default source parallelism > (`execution.batch.adaptive.auto-parallelism.default-source-parallelism`) > and JobVertex#maxParallelism. If the default-source-parallelism is not set, > the global default parallelism is used as the default source parallelism. > > The 'Max source parallelism’ is the information that runtime offered to > Source as a hint to infer the actual parallelism, a name with max prefix > but calculated with minimum value confusing me a lot, especially when I > read the HiveSource pseudocode: > > fileEnumerator.setMinNumSplits(maxSourceParallelism); > > Although I understand that naming is a complex topic in CS, could we > improve this method name a little? And, > `execution.batch.adaptive.auto-parallelism.default-source-parallelism` > config has a default value with 1, how we distinguish user set it or not > for example if user happen to set value 1 ? > > (2) About changing default value of > 'table.exec.hive.infer-source-parallelism' > The FLIP said: As a follow-up task, we will consider changing the default > value of 'table.exec.hive.infer-source-parallelism' to false. > > No doubt that it’s a API breaking change, for existing hive users, the > migration path is not clear in this FLIP, for example, current users used > splits number to infer the source parallelism, after this FLIP, could we > give the recommended value of > `execution.batch.adaptive.auto-parallelism.default-source-parallelism` or > how to set it or event users do not need to set anythins? And the > replacement for migration replacement should add to > 'table.exec.hive.infer-source-parallelism’s description when we propose to > change its default value, right? > > (3) [minor] About the HiveSource > The pseudocode code shows: > > fileEnumerator.getInferredSourceParallelsim(); > > IIRC, our public API FileEnumerator never offers such method, introducing > getInferredSourceParallelsim() is also one part of our FLIP ? > > Best, > Leonard > > > > > > > Best regards, > > Xia > > > > Leonard Xu 于2023年11月8日周三 16:19写道: > > > >> Thanks Xia and Zhu Zhu for kickoff this discussion. > >> > >> The dynamic source parallelism inference is a useful feature for batch > >> story. I’ve some comments about current design. > >> > >> 1.How user disable the parallelism inference if they want to use fixed > >> source parallelism? They can configure fixed parallelism in table layer > >> currently as you explained above. > >> > >> 2.Could you explain the priority the static parallelism set from table > >> layer and the proposed dynamic source parallelism? And changing the > default > >> value `table.exec.hive.infer-source-parallelism` as a sub-task does not > >> resolve all case, because other Sources can set their own parallelism > too. > >> > >> 3.Current design only works for batch josb, the workflow for streaming > job > >> may looks like (1) inference parallelism for streaming source like > kafka > >> (2) stop job with a savepoint (3) apply new parallelism for job (4) > >> schedule the streaming job from savepoint which is totally different, > the > >> later one lacks a lot of infra in Flink, right? So, could we consider > the > >> boundness info when design the interface? Both FileSource and Hive > Source > >> offer streaming read ability, imaging this case: Flink Streaming Hive > >> Source should not apply the dynamic source parallelism even it > implemented > >> the feature as it severing a streaming job. > >> > >> Best, > >> Leonard > >> > >> > >>> 2023年11月1日 下午6:21,Xia Sun 写道: > >>> > >>> Thanks Lijie for the comments! > >>> 1. For Hive source, dynamic parallelism inference in batch scenarios > is a > >>> superset of static parallelism inference. As a follow-up task, we can > >>> consider changing the default value of > >>> 'table.exec.hive.infer-source-parallelism' to false. > >>> > >>> 2. I think that both dynamic parallelism inference and static > parallelism > >>> inference have their own use cases. Currently, for streaming sources > and > >>> other sources
Re: [VOTE] FLIP-381: Deprecate configuration getters/setters that return/set complex Java objects
+1 (non-binding) Best, Xia Samrat Deb 于2023年11月13日周一 12:37写道: > +1 (non binding) > > Bests, > Samrat > > On Mon, 13 Nov 2023 at 9:10 AM, Yangze Guo wrote: > > > +1 (binding) > > > > Best, > > Yangze Guo > > > > On Mon, Nov 13, 2023 at 11:35 AM weijie guo > > wrote: > > > > > > +1(binding) > > > > > > Best regards, > > > > > > Weijie > > > > > > > > > Lijie Wang 于2023年11月13日周一 10:40写道: > > > > > > > +1 (binding) > > > > > > > > Best, > > > > Lijie > > > > > > > > Yuepeng Pan 于2023年11月10日周五 18:32写道: > > > > > > > > > +1(non-binding) > > > > > > > > > > Best, > > > > > Roc > > > > > > > > > > On 2023/11/10 03:58:10 Junrui Lee wrote: > > > > > > Hi everyone, > > > > > > > > > > > > Thank you to everyone for the feedback on FLIP-381: Deprecate > > > > > configuration > > > > > > getters/setters that return/set complex Java objects[1] which has > > been > > > > > > discussed in this thread [2]. > > > > > > > > > > > > I would like to start a vote for it. The vote will be open for at > > least > > > > > 72 > > > > > > hours (excluding weekends) unless there is an objection or not > > enough > > > > > votes. > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278464992 > > > > > > [2] > > https://lists.apache.org/thread/y5owjkfxq3xs9lmpdbl6d6jmqdgbjqxo > > > > > > > > > > > > > > > > > >
Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs
Thanks Leonard for the feedback and sorry for my late response. > `How user disable the parallelism inference if they want to use fixed source parallelism?` > `Could you explain the priority the static parallelism set from table layer and the proposed dynamic source parallelism?` >From the user's perspective, if the user specifies a fixed parallelism for the source, dynamic source parallelism inference will be automatically disabled. From the perspective of priority, the user’s specified parallelism > the static parallelism inference > dynamic parallelism inference. Because the dynamic source parallelism inference will take effect at the runtime stage and the validity conditions are: (1) the current ExecutionGraph is a dynamic graph, and (2) the parallelism of the source vertex is not specified (that is, the parallelism is -1). > `the workflow for streaming job may looks like ... which is totally different, the later one lacks a lot of infra in Flink, right?` Indeed, as of now, the dynamic parallelism inference is exclusively for batch jobs, so it only takes into account the necessary information for batch scenarios. In the future, when we introduce support for automatic parallelism inference in streaming jobs, we can include the required information for streaming jobs to avoid unnecessarily complicating the current design. Moreover, The workflow you mentioned seems a bit complicated. Our current idea is to perform the parallelism inference during the initialization phase of streaming jobs and proceed to schedule the entire job once the source parallelism is determined. This process will naturally occur during job startup, eliminating the need for additional restarts. > `So, could we consider the boundness info when design the interface? Both FileSource and Hive Source offer streaming read ability, imaging this case: Flink Streaming Hive Source should not apply the dynamic source parallelism even it implemented the feature as it severing a streaming job.` Thanks for your feedback, it is reallly a good input. Currently, the dynamic parallelism inference logic is only triggered in batch jobs. Therefore, the logic will not be called in the streaming jobs. In the future, if streaming jobs also support runtime parallelism inference, then theoretically, the source can no longer be distinguished between streaming jobs and batch jobs at the runtime stage. In addition, since the new interface is implemented together with the Source interface, the Source::getBoundedness() method can also be obtained when inferring parallelism. Best regards, Xia Leonard Xu 于2023年11月8日周三 16:19写道: > Thanks Xia and Zhu Zhu for kickoff this discussion. > > The dynamic source parallelism inference is a useful feature for batch > story. I’ve some comments about current design. > > 1.How user disable the parallelism inference if they want to use fixed > source parallelism? They can configure fixed parallelism in table layer > currently as you explained above. > > 2.Could you explain the priority the static parallelism set from table > layer and the proposed dynamic source parallelism? And changing the default > value `table.exec.hive.infer-source-parallelism` as a sub-task does not > resolve all case, because other Sources can set their own parallelism too. > > 3.Current design only works for batch josb, the workflow for streaming job > may looks like (1) inference parallelism for streaming source like kafka > (2) stop job with a savepoint (3) apply new parallelism for job (4) > schedule the streaming job from savepoint which is totally different, the > later one lacks a lot of infra in Flink, right? So, could we consider the > boundness info when design the interface? Both FileSource and Hive Source > offer streaming read ability, imaging this case: Flink Streaming Hive > Source should not apply the dynamic source parallelism even it implemented > the feature as it severing a streaming job. > > Best, > Leonard > > > > 2023年11月1日 下午6:21,Xia Sun 写道: > > > > Thanks Lijie for the comments! > > 1. For Hive source, dynamic parallelism inference in batch scenarios is a > > superset of static parallelism inference. As a follow-up task, we can > > consider changing the default value of > > 'table.exec.hive.infer-source-parallelism' to false. > > > > 2. I think that both dynamic parallelism inference and static parallelism > > inference have their own use cases. Currently, for streaming sources and > > other sources that are not sensitive to dynamic information, the benefits > > of dynamic parallelism inference may not be significant. In such cases, > we > > can continue to use static parallelism inference. > > > > Thanks, > > Xia > > > > Lijie Wang 于2023年11月1日周三 14:52写道: > > > >> Hi Xia, > >&
Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs
Thanks Lijie for the comments! 1. For Hive source, dynamic parallelism inference in batch scenarios is a superset of static parallelism inference. As a follow-up task, we can consider changing the default value of 'table.exec.hive.infer-source-parallelism' to false. 2. I think that both dynamic parallelism inference and static parallelism inference have their own use cases. Currently, for streaming sources and other sources that are not sensitive to dynamic information, the benefits of dynamic parallelism inference may not be significant. In such cases, we can continue to use static parallelism inference. Thanks, Xia Lijie Wang 于2023年11月1日周三 14:52写道: > Hi Xia, > > Thanks for driving this FLIP, +1 for the proposal. > > I have 2 questions about the relationship between static inference and > dynamic inference: > > 1. AFAIK, currently the hive table source enable static inference by > default. In this case, which one (static vs dynamic) will take effect ? I > think it would be better if we can point this out in FLIP > > 2. As you mentioned above, dynamic inference is the most ideal way, so do > we have plan to deprecate the static inference in the future? > > Best, > Lijie > > Zhu Zhu 于2023年10月31日周二 20:19写道: > > > Thanks for opening the FLIP and kicking off this discussion, Xia! > > The proposed changes make up an important missing part of the dynamic > > parallelism inference of adaptive batch scheduler. > > > > Besides that, it is also one good step towards supporting dynamic > > parallelism inference for streaming sources, e.g. allowing Kafka > > sources to determine its parallelism automatically based on the > > number of partitions. > > > > +1 for the proposal. > > > > Thanks, > > Zhu > > > > Xia Sun 于2023年10月31日周二 16:01写道: > > > > > Hi everyone, > > > I would like to start a discussion on FLIP-379: Dynamic source > > parallelism > > > inference for batch jobs[1]. > > > > > > In general, there are three main ways to set source parallelism for > batch > > > jobs: > > > (1) User-defined source parallelism. > > > (2) Connector static parallelism inference. > > > (3) Dynamic parallelism inference. > > > > > > Compared to manually setting parallelism, automatic parallelism > inference > > > is easier to use and can better adapt to varying data volumes each day. > > > However, static parallelism inference cannot leverage runtime > > information, > > > resulting in inaccurate parallelism inference. Therefore, for batch > jobs, > > > dynamic parallelism inference is the most ideal, but currently, the > > support > > > for adaptive batch scheduler is not very comprehensive. > > > > > > Therefore, we aim to introduce a general interface that enables the > > > adaptive batch scheduler to dynamically infer the source parallelism at > > > runtime. Please refer to the FLIP[1] document for more details about > the > > > proposed design and implementation. > > > > > > I also thank Zhu Zhu and LiJie Wang for their suggestions during the > > > pre-discussion. > > > Looking forward to your feedback and suggestions, thanks. > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs > > > > > > Best regards, > > > Xia > > > > > >
[DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs
Hi everyone, I would like to start a discussion on FLIP-379: Dynamic source parallelism inference for batch jobs[1]. In general, there are three main ways to set source parallelism for batch jobs: (1) User-defined source parallelism. (2) Connector static parallelism inference. (3) Dynamic parallelism inference. Compared to manually setting parallelism, automatic parallelism inference is easier to use and can better adapt to varying data volumes each day. However, static parallelism inference cannot leverage runtime information, resulting in inaccurate parallelism inference. Therefore, for batch jobs, dynamic parallelism inference is the most ideal, but currently, the support for adaptive batch scheduler is not very comprehensive. Therefore, we aim to introduce a general interface that enables the adaptive batch scheduler to dynamically infer the source parallelism at runtime. Please refer to the FLIP[1] document for more details about the proposed design and implementation. I also thank Zhu Zhu and LiJie Wang for their suggestions during the pre-discussion. Looking forward to your feedback and suggestions, thanks. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs Best regards, Xia
Re: [VOTE] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs
+1 (non-binding) Best Regards, Xia yuxia 于2023年6月25日周日 09:23写道: > +1 (binding) > Thanks Lijie driving it. > > Best regards, > Yuxia > > - 原始邮件 - > 发件人: "Yuepeng Pan" > 收件人: "dev" > 发送时间: 星期六, 2023年 6 月 24日 下午 9:06:53 > 主题: Re:[VOTE] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs > > +1 (non-binding) > > Thanks, > Yuepeng Pan > > > > > > > > > > > > > > > > At 2023-06-23 23:49:53, "Lijie Wang" wrote: > >Hi all, > > > >Thanks for all the feedback about the FLIP-324: Introduce Runtime Filter > >for Flink Batch Jobs[1]. This FLIP was discussed in [2]. > > > >I'd like to start a vote for it. The vote will be open for at least 72 > >hours (until June 29th 12:00 GMT) unless there is an objection or > >insufficient votes. > > > >[1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs > >[2] https://lists.apache.org/thread/mm0o8fv7x7k13z11htt88zhy7lo8npmg > > > >Best, > >Lijie >
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Hi Yuxin, Thanks for creating this FLIP! I'm a flink user, and in our internal scenario we use the colocation technology to run flink jobs and online service on the same machine together. We found that flink jobs are occasionally affected by other non-flink jobs (i.e. if the host disk space is full, that will result in 'No space left on device' error on flink jobs). This flip will really help us to benefit from hybrid shuffle without being worried about insufficient disk space problem. And I also have a few questions. 1. If the same subpartition spans multiple different tiers, how to keep the order of segments between different storage tiers (if necessary)? 2. In the process of writing to the local disk for a subpartition, what will happen if the disk space is found to be full? Will it report an error or automatically transfer to remote storage? 3. For remote storage, I noticed that it uses direct reading, which is different from the other two, does the switching between different tiers will bring overhead or waiting? In addition, compared to flink rss, which optimizes data compression and small file merging to improve throughput and relieve file system pressure, does the object storage system can meet the performance requirements and concurrent access challenges of large-scale batch jobs(parallelism > 1)? Thanks, Xia Zhu Zhu 于2023年3月10日周五 16:44写道: > Hi Yuxin, > > Thanks for creating this FLIP! > The idea of tiered storage looks good. Instead of choosing one from > multiple storages, it can help to balance between performance, cost and > stability. It also has the potential to adaptively select proper tiers > according to more runtime information, to achieve better performance > and ease of use. > > I have a question about the tier finding of data reading. In the FLIP > it proposes that the Read Client asks each storage tier whether a > given segment exists in it, from higher priority tiers to lower priority > ones. I'm a bit concerned about the cost of it, especially when data > are written to low priority tiers. Do you have any evaluation of it? > Is it possible to let the Reader Client know the location of the next > segment when it has finished reading one segment? Or maybe just let it > know whether the next segment is located in the same tier, if we can > have the assumption that tier changing would not be very frequent. > > Thanks, > Zhu > > Weihua Hu 于2023年3月10日周五 11:52写道: > > > > Thanks Yuxin for your explanation. > > > > That sounds reasonable. Looking forward to the new shuffle. > > > > > > Best, > > Weihua > > > > > > On Fri, Mar 10, 2023 at 11:48 AM Yuxin Tan > wrote: > > > > > Hi, Weihua, > > > Thanks for the questions and the ideas. > > > > > > > 1. How many performance regressions would there be if we only > > > used remote storage? > > > > > > The new architecture can support to use remote storage only, but this > > > FLIP target is to improve job stability. And the change in the FLIP has > > > been significantly complex and the goal of the first version is to > update > > > Hybrid Shuffle to the new architecture and support remote storage as > > > a supplement. The performance of this version is not the first > priority, > > > so we haven’t tested the performance of using only remote storage. > > > If there are indeed regressions, we will keep optimizing the > performance > > > of the remote storages and improve it until only remote storage is > > > available in the production environment. > > > > > > > 2. Shall we move the local data to remote storage if the producer is > > > finished for a long time? > > > > > > I agree that it is a good idea, which can release task manager > resources > > > more timely. But moving data from TM local disk to remote storage needs > > > more detailed discussion and design, and it is easier to implement it > based > > > on the new architecture. Considering the complexity, the target focus, > and > > > the iteration cycle of the FLIP, we decide that the details are not > > > included > > > in the first version. We will extend and implement them in the > subsequent > > > versions. > > > > > > Best, > > > Yuxin > > > > > > > > > Weihua Hu 于2023年3月9日周四 11:22写道: > > > > > > > Hi, Yuxin > > > > > > > > Thanks for driving this FLIP. > > > > > > > > The remote storage shuffle could improve the stability of Batch jobs. > > > > > > > > In our internal scenario, we use a hybrid cluster to run both > > > > Streaming(high priority) > > > > and Batch jobs(low priority). When there is not enough > resources(such as > > > > cpu usage > > > > reaches a threshold), the batch containers will be evicted. So this > will > > > > cause some re-run > > > > of batch tasks. > > > > > > > > It would be a great help if the remote storage could address this. > So I > > > > have a few questions. > > > > > > > > 1. How many performance regressions would there be if we only used > remote > > > > storage? > > > > > > > > 2. In current design, the shuffle data segment will