Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode
Hi All, Thanks for all the feedback about this FLIP. Since there are no other concerns, this FLIP-235 discussion is over. I will open a vote today. Best regards, Weijie Xintong Song 于2022年5月25日周三 22:17写道: > Ok, I think we are on the same page. I'm aware of > ExecutionConfig#setExecutionMode, which sets the data exchanging mode at > the job scope. > > Best, > > Xintong > > > > On Wed, May 25, 2022 at 9:50 PM Chesnay Schepler > wrote: > > > You can influence it to some extent via ExecutionConfig#setExecutionMode. > > You can for example for all shuffles to use blocking exchanges. > > > > I'm not proposing an API that would allow this to be set per edge. > > > > On 25/05/2022 15:23, Xintong Song wrote: > > > > In general, I agree with you about aiming jobs with no/few blocking > > exchanges for fine-grained recovery. The only problem is, correct me if > I'm > > wrong, users currently cannot control the data exchanging mode of a > > specific edge. I'm not aware of such APIs. > > > > As a first step, I'd prefer excluding this from the scope of this FLIP. > > > > Best, > > > > Xintong > > > > > > On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler > > wrote: > > > >> Yes; but that's also a limitation of the current fine-grained recovery. > >> > >> My suggestion was primarily aimed at jobs that have no/few blocking > >> exchanges, where users would currently have to explicitly configure > >> additional blocking exchanges to really get something out of > >> fine-grained recovery (at the expense of e2e job duration). > >> > >> On 25/05/2022 14:47, Xintong Song wrote: > >> >> Will this also allow spilling everything to disk while also > forwarding > >> >> data to the next task? > >> >> > >> > Yes, as long as the downstream task is started, this always forward > the > >> > data, even while spilling everything. > >> > > >> > This would allow us to improve fine-grained recovery by no longer > being > >> >> constrained to pipelined regions. > >> > > >> > I think it helps preventing restarts of the upstreams for a failed > task, > >> > but not the downstreams. Because there's no guarantee a restarted task > >> will > >> > prevent exactly same data (in terms of order) as the previous > execution, > >> > thus downstreams cannot resume consuming the data. > >> > > >> > > >> > Best, > >> > > >> > Xintong > >> > > >> > > >> > > >> > On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler > >> wrote: > >> > > >> >> Will this also allow spilling everything to disk while also > forwarding > >> >> data to the next task? > >> >> > >> >> This would allow us to improve fine-grained recovery by no longer > being > >> >> constrained to pipelined regions. > >> >> > >> >> On 25/05/2022 05:55, weijie guo wrote: > >> >>> Hi All, > >> >>> Thank you for your attention and feedback. > >> >>> Do you have any other comments? If there are no other questions, > I'll > >> >> vote > >> >>> on FLIP-235 tomorrow. > >> >>> > >> >>> Best regards, > >> >>> > >> >>> Weijie > >> >>> > >> >>> > >> >>> Aitozi 于2022年5月20日周五 13:22写道: > >> >>> > >> Hi Xintong > >> Thanks for your detailed explanation, I misunderstand the > spill > >> behavior at first glance, > >> I get your point now. I think it will be a good addition to the > >> current > >> execution mode. > >> Looking forward to it :) > >> > >> Best, > >> Aitozi > >> > >> Xintong Song 于2022年5月20日周五 12:26写道: > >> > >> > Hi Aitozi, > >> > > >> > In which case we can use the hybrid shuffle mode > >> > > >> > Just to directly answer this question, in addition to > >> > Weijie's explanations. For batch workload, if you want the > workload > >> to > >> take > >> > advantage of as many resources as available, which ranges from a > >> single > >> > slot to as many slots as the total tasks, you may consider hybrid > >> >> shuffle > >> > mode. Admittedly, this may not always be wanted, e.g., users may > not > >> >> want > >> > to execute a job if there's too few resources available, or may > not > >> >> want > >> a > >> > job taking too many of the cluster resources. That's why we > propose > >> hybrid > >> > shuffle as an additional option for batch users, rather than a > >> replacement > >> > for Pipelined or Blocking mode. > >> > > >> > So you mean the hybrid shuffle mode will limit its usage to the > >> bounded > >> >> source, Right ? > >> >> > >> > Yes. > >> > > >> > One more question, with the bounded data and partly of the stage > is > >> running > >> >> in the Pipelined shuffle mode, what will be the behavior of the > >> task > >> >> failure, Is the checkpoint enabled for these running stages or > >> will it > >> >> re-run after the failure? > >> >> > >> > There's no checkpoints. The failover behavior depends on the > >> spilling > >> > strategy. > >> > - In the first version, we only consider a selective s
Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode
Ok, I think we are on the same page. I'm aware of ExecutionConfig#setExecutionMode, which sets the data exchanging mode at the job scope. Best, Xintong On Wed, May 25, 2022 at 9:50 PM Chesnay Schepler wrote: > You can influence it to some extent via ExecutionConfig#setExecutionMode. > You can for example for all shuffles to use blocking exchanges. > > I'm not proposing an API that would allow this to be set per edge. > > On 25/05/2022 15:23, Xintong Song wrote: > > In general, I agree with you about aiming jobs with no/few blocking > exchanges for fine-grained recovery. The only problem is, correct me if I'm > wrong, users currently cannot control the data exchanging mode of a > specific edge. I'm not aware of such APIs. > > As a first step, I'd prefer excluding this from the scope of this FLIP. > > Best, > > Xintong > > > On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler > wrote: > >> Yes; but that's also a limitation of the current fine-grained recovery. >> >> My suggestion was primarily aimed at jobs that have no/few blocking >> exchanges, where users would currently have to explicitly configure >> additional blocking exchanges to really get something out of >> fine-grained recovery (at the expense of e2e job duration). >> >> On 25/05/2022 14:47, Xintong Song wrote: >> >> Will this also allow spilling everything to disk while also forwarding >> >> data to the next task? >> >> >> > Yes, as long as the downstream task is started, this always forward the >> > data, even while spilling everything. >> > >> > This would allow us to improve fine-grained recovery by no longer being >> >> constrained to pipelined regions. >> > >> > I think it helps preventing restarts of the upstreams for a failed task, >> > but not the downstreams. Because there's no guarantee a restarted task >> will >> > prevent exactly same data (in terms of order) as the previous execution, >> > thus downstreams cannot resume consuming the data. >> > >> > >> > Best, >> > >> > Xintong >> > >> > >> > >> > On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler >> wrote: >> > >> >> Will this also allow spilling everything to disk while also forwarding >> >> data to the next task? >> >> >> >> This would allow us to improve fine-grained recovery by no longer being >> >> constrained to pipelined regions. >> >> >> >> On 25/05/2022 05:55, weijie guo wrote: >> >>> Hi All, >> >>> Thank you for your attention and feedback. >> >>> Do you have any other comments? If there are no other questions, I'll >> >> vote >> >>> on FLIP-235 tomorrow. >> >>> >> >>> Best regards, >> >>> >> >>> Weijie >> >>> >> >>> >> >>> Aitozi 于2022年5月20日周五 13:22写道: >> >>> >> Hi Xintong >> Thanks for your detailed explanation, I misunderstand the spill >> behavior at first glance, >> I get your point now. I think it will be a good addition to the >> current >> execution mode. >> Looking forward to it :) >> >> Best, >> Aitozi >> >> Xintong Song 于2022年5月20日周五 12:26写道: >> >> > Hi Aitozi, >> > >> > In which case we can use the hybrid shuffle mode >> > >> > Just to directly answer this question, in addition to >> > Weijie's explanations. For batch workload, if you want the workload >> to >> take >> > advantage of as many resources as available, which ranges from a >> single >> > slot to as many slots as the total tasks, you may consider hybrid >> >> shuffle >> > mode. Admittedly, this may not always be wanted, e.g., users may not >> >> want >> > to execute a job if there's too few resources available, or may not >> >> want >> a >> > job taking too many of the cluster resources. That's why we propose >> hybrid >> > shuffle as an additional option for batch users, rather than a >> replacement >> > for Pipelined or Blocking mode. >> > >> > So you mean the hybrid shuffle mode will limit its usage to the >> bounded >> >> source, Right ? >> >> >> > Yes. >> > >> > One more question, with the bounded data and partly of the stage is >> running >> >> in the Pipelined shuffle mode, what will be the behavior of the >> task >> >> failure, Is the checkpoint enabled for these running stages or >> will it >> >> re-run after the failure? >> >> >> > There's no checkpoints. The failover behavior depends on the >> spilling >> > strategy. >> > - In the first version, we only consider a selective spilling >> strategy, >> > which means spill data as little as possible to the disk, which >> means >> >> in >> > case of failover upstream tasks need to be restarted to reproduce >> the >> > complete intermediate results. >> > - An alternative strategy we may introduce in future if needed is to >> spill >> > the complete intermediate results. That avoids restarting upstream >> >> tasks >> in >> > case of failover, because the produced intermediate results can be >> > re-consumed,
Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode
You can influence it to some extent via ExecutionConfig#setExecutionMode. You can for example for all shuffles to use blocking exchanges. I'm not proposing an API that would allow this to be set per edge. On 25/05/2022 15:23, Xintong Song wrote: In general, I agree with you about aiming jobs with no/few blocking exchanges for fine-grained recovery. The only problem is, correct me if I'm wrong, users currently cannot control the data exchanging mode of a specific edge. I'm not aware of such APIs. As a first step, I'd prefer excluding this from the scope of this FLIP. Best, Xintong On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler wrote: Yes; but that's also a limitation of the current fine-grained recovery. My suggestion was primarily aimed at jobs that have no/few blocking exchanges, where users would currently have to explicitly configure additional blocking exchanges to really get something out of fine-grained recovery (at the expense of e2e job duration). On 25/05/2022 14:47, Xintong Song wrote: >> Will this also allow spilling everything to disk while also forwarding >> data to the next task? >> > Yes, as long as the downstream task is started, this always forward the > data, even while spilling everything. > > This would allow us to improve fine-grained recovery by no longer being >> constrained to pipelined regions. > > I think it helps preventing restarts of the upstreams for a failed task, > but not the downstreams. Because there's no guarantee a restarted task will > prevent exactly same data (in terms of order) as the previous execution, > thus downstreams cannot resume consuming the data. > > > Best, > > Xintong > > > > On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler wrote: > >> Will this also allow spilling everything to disk while also forwarding >> data to the next task? >> >> This would allow us to improve fine-grained recovery by no longer being >> constrained to pipelined regions. >> >> On 25/05/2022 05:55, weijie guo wrote: >>> Hi All, >>> Thank you for your attention and feedback. >>> Do you have any other comments? If there are no other questions, I'll >> vote >>> on FLIP-235 tomorrow. >>> >>> Best regards, >>> >>> Weijie >>> >>> >>> Aitozi 于2022年5月20日周五 13:22写道: >>> Hi Xintong Thanks for your detailed explanation, I misunderstand the spill behavior at first glance, I get your point now. I think it will be a good addition to the current execution mode. Looking forward to it :) Best, Aitozi Xintong Song 于2022年5月20日周五 12:26写道: > Hi Aitozi, > > In which case we can use the hybrid shuffle mode > > Just to directly answer this question, in addition to > Weijie's explanations. For batch workload, if you want the workload to take > advantage of as many resources as available, which ranges from a single > slot to as many slots as the total tasks, you may consider hybrid >> shuffle > mode. Admittedly, this may not always be wanted, e.g., users may not >> want > to execute a job if there's too few resources available, or may not >> want a > job taking too many of the cluster resources. That's why we propose hybrid > shuffle as an additional option for batch users, rather than a replacement > for Pipelined or Blocking mode. > > So you mean the hybrid shuffle mode will limit its usage to the bounded >> source, Right ? >> > Yes. > > One more question, with the bounded data and partly of the stage is running >> in the Pipelined shuffle mode, what will be the behavior of the task >> failure, Is the checkpoint enabled for these running stages or will it >> re-run after the failure? >> > There's no checkpoints. The failover behavior depends on the spilling > strategy. > - In the first version, we only consider a selective spilling strategy, > which means spill data as little as possible to the disk, which means >> in > case of failover upstream tasks need to be restarted to reproduce the > complete intermediate results. > - An alternative strategy we may introduce in future if needed is to spill > the complete intermediate results. That avoids restarting upstream >> tasks in > case of failover, because the produced intermediate results can be > re-consumed, at the cost of more disk IO load. > With bo
Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode
In general, I agree with you about aiming jobs with no/few blocking exchanges for fine-grained recovery. The only problem is, correct me if I'm wrong, users currently cannot control the data exchanging mode of a specific edge. I'm not aware of such APIs. As a first step, I'd prefer excluding this from the scope of this FLIP. Best, Xintong On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler wrote: > Yes; but that's also a limitation of the current fine-grained recovery. > > My suggestion was primarily aimed at jobs that have no/few blocking > exchanges, where users would currently have to explicitly configure > additional blocking exchanges to really get something out of > fine-grained recovery (at the expense of e2e job duration). > > On 25/05/2022 14:47, Xintong Song wrote: > >> Will this also allow spilling everything to disk while also forwarding > >> data to the next task? > >> > > Yes, as long as the downstream task is started, this always forward the > > data, even while spilling everything. > > > > This would allow us to improve fine-grained recovery by no longer being > >> constrained to pipelined regions. > > > > I think it helps preventing restarts of the upstreams for a failed task, > > but not the downstreams. Because there's no guarantee a restarted task > will > > prevent exactly same data (in terms of order) as the previous execution, > > thus downstreams cannot resume consuming the data. > > > > > > Best, > > > > Xintong > > > > > > > > On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler > wrote: > > > >> Will this also allow spilling everything to disk while also forwarding > >> data to the next task? > >> > >> This would allow us to improve fine-grained recovery by no longer being > >> constrained to pipelined regions. > >> > >> On 25/05/2022 05:55, weijie guo wrote: > >>> Hi All, > >>> Thank you for your attention and feedback. > >>> Do you have any other comments? If there are no other questions, I'll > >> vote > >>> on FLIP-235 tomorrow. > >>> > >>> Best regards, > >>> > >>> Weijie > >>> > >>> > >>> Aitozi 于2022年5月20日周五 13:22写道: > >>> > Hi Xintong > Thanks for your detailed explanation, I misunderstand the spill > behavior at first glance, > I get your point now. I think it will be a good addition to the > current > execution mode. > Looking forward to it :) > > Best, > Aitozi > > Xintong Song 于2022年5月20日周五 12:26写道: > > > Hi Aitozi, > > > > In which case we can use the hybrid shuffle mode > > > > Just to directly answer this question, in addition to > > Weijie's explanations. For batch workload, if you want the workload > to > take > > advantage of as many resources as available, which ranges from a > single > > slot to as many slots as the total tasks, you may consider hybrid > >> shuffle > > mode. Admittedly, this may not always be wanted, e.g., users may not > >> want > > to execute a job if there's too few resources available, or may not > >> want > a > > job taking too many of the cluster resources. That's why we propose > hybrid > > shuffle as an additional option for batch users, rather than a > replacement > > for Pipelined or Blocking mode. > > > > So you mean the hybrid shuffle mode will limit its usage to the > bounded > >> source, Right ? > >> > > Yes. > > > > One more question, with the bounded data and partly of the stage is > running > >> in the Pipelined shuffle mode, what will be the behavior of the task > >> failure, Is the checkpoint enabled for these running stages or will > it > >> re-run after the failure? > >> > > There's no checkpoints. The failover behavior depends on the spilling > > strategy. > > - In the first version, we only consider a selective spilling > strategy, > > which means spill data as little as possible to the disk, which means > >> in > > case of failover upstream tasks need to be restarted to reproduce the > > complete intermediate results. > > - An alternative strategy we may introduce in future if needed is to > spill > > the complete intermediate results. That avoids restarting upstream > >> tasks > in > > case of failover, because the produced intermediate results can be > > re-consumed, at the cost of more disk IO load. > > With both strategies, the trade-off between failover cost and IO load > >> is > > for the user to decide. This is also discussed in the > MemoryDataManager > > section of the FLIP. > > > > Best, > > > > Xintong > > > > > > > > On Fri, May 20, 2022 at 12:10 PM Aitozi > wrote: > > > >> Thanks Weijie for your answer. So you mean the hybrid shuffle mode > >> will > >> limit > >> its usage to the bounded source, Right ? > >> One more question, with the bounded data and partly of the stage is > > running > >> in the
Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode
Yes; but that's also a limitation of the current fine-grained recovery. My suggestion was primarily aimed at jobs that have no/few blocking exchanges, where users would currently have to explicitly configure additional blocking exchanges to really get something out of fine-grained recovery (at the expense of e2e job duration). On 25/05/2022 14:47, Xintong Song wrote: Will this also allow spilling everything to disk while also forwarding data to the next task? Yes, as long as the downstream task is started, this always forward the data, even while spilling everything. This would allow us to improve fine-grained recovery by no longer being constrained to pipelined regions. I think it helps preventing restarts of the upstreams for a failed task, but not the downstreams. Because there's no guarantee a restarted task will prevent exactly same data (in terms of order) as the previous execution, thus downstreams cannot resume consuming the data. Best, Xintong On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler wrote: Will this also allow spilling everything to disk while also forwarding data to the next task? This would allow us to improve fine-grained recovery by no longer being constrained to pipelined regions. On 25/05/2022 05:55, weijie guo wrote: Hi All, Thank you for your attention and feedback. Do you have any other comments? If there are no other questions, I'll vote on FLIP-235 tomorrow. Best regards, Weijie Aitozi 于2022年5月20日周五 13:22写道: Hi Xintong Thanks for your detailed explanation, I misunderstand the spill behavior at first glance, I get your point now. I think it will be a good addition to the current execution mode. Looking forward to it :) Best, Aitozi Xintong Song 于2022年5月20日周五 12:26写道: Hi Aitozi, In which case we can use the hybrid shuffle mode Just to directly answer this question, in addition to Weijie's explanations. For batch workload, if you want the workload to take advantage of as many resources as available, which ranges from a single slot to as many slots as the total tasks, you may consider hybrid shuffle mode. Admittedly, this may not always be wanted, e.g., users may not want to execute a job if there's too few resources available, or may not want a job taking too many of the cluster resources. That's why we propose hybrid shuffle as an additional option for batch users, rather than a replacement for Pipelined or Blocking mode. So you mean the hybrid shuffle mode will limit its usage to the bounded source, Right ? Yes. One more question, with the bounded data and partly of the stage is running in the Pipelined shuffle mode, what will be the behavior of the task failure, Is the checkpoint enabled for these running stages or will it re-run after the failure? There's no checkpoints. The failover behavior depends on the spilling strategy. - In the first version, we only consider a selective spilling strategy, which means spill data as little as possible to the disk, which means in case of failover upstream tasks need to be restarted to reproduce the complete intermediate results. - An alternative strategy we may introduce in future if needed is to spill the complete intermediate results. That avoids restarting upstream tasks in case of failover, because the produced intermediate results can be re-consumed, at the cost of more disk IO load. With both strategies, the trade-off between failover cost and IO load is for the user to decide. This is also discussed in the MemoryDataManager section of the FLIP. Best, Xintong On Fri, May 20, 2022 at 12:10 PM Aitozi wrote: Thanks Weijie for your answer. So you mean the hybrid shuffle mode will limit its usage to the bounded source, Right ? One more question, with the bounded data and partly of the stage is running in the Pipelined shuffle mode, what will be the behavior of the task failure, Is the checkpoint enabled for these running stages or will it re-run after the failure? Best, Aitozi weijie guo 于2022年5月20日周五 10:45写道: Hi, Aitozi: Thank you for the feedback! Here are some of my thoughts on your question 1.If there is an unbounded data source, but only have resource to schedule the first stage, will it bring the big burden to the disk/shuffle service which will occupy all the resource I think. First of all, Hybrid Shuffle Mode is oriented to the batch job scenario, so there is no problem of unbounded data sources. Secondly, if you consider the stream scenario, I think Pipelined Shuffle should still be the best choice at present. For an unbounded data stream, it is not meaningful to only run some stages. 2. Which kind of job will benefit from the hybrid shuffle mode. In other words, In which case we can use the hybrid shuffle mode: Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid shuffle mode can effectively utilize cluster resources and avoid some unnecessary disk IO overhead. For OLAP scenarios, which are char
Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode
> > Will this also allow spilling everything to disk while also forwarding > data to the next task? > Yes, as long as the downstream task is started, this always forward the data, even while spilling everything. This would allow us to improve fine-grained recovery by no longer being > constrained to pipelined regions. I think it helps preventing restarts of the upstreams for a failed task, but not the downstreams. Because there's no guarantee a restarted task will prevent exactly same data (in terms of order) as the previous execution, thus downstreams cannot resume consuming the data. Best, Xintong On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler wrote: > Will this also allow spilling everything to disk while also forwarding > data to the next task? > > This would allow us to improve fine-grained recovery by no longer being > constrained to pipelined regions. > > On 25/05/2022 05:55, weijie guo wrote: > > Hi All, > > Thank you for your attention and feedback. > > Do you have any other comments? If there are no other questions, I'll > vote > > on FLIP-235 tomorrow. > > > > Best regards, > > > > Weijie > > > > > > Aitozi 于2022年5月20日周五 13:22写道: > > > >> Hi Xintong > >> Thanks for your detailed explanation, I misunderstand the spill > >> behavior at first glance, > >> I get your point now. I think it will be a good addition to the current > >> execution mode. > >> Looking forward to it :) > >> > >> Best, > >> Aitozi > >> > >> Xintong Song 于2022年5月20日周五 12:26写道: > >> > >>> Hi Aitozi, > >>> > >>> In which case we can use the hybrid shuffle mode > >>> > >>> Just to directly answer this question, in addition to > >>> Weijie's explanations. For batch workload, if you want the workload to > >> take > >>> advantage of as many resources as available, which ranges from a single > >>> slot to as many slots as the total tasks, you may consider hybrid > shuffle > >>> mode. Admittedly, this may not always be wanted, e.g., users may not > want > >>> to execute a job if there's too few resources available, or may not > want > >> a > >>> job taking too many of the cluster resources. That's why we propose > >> hybrid > >>> shuffle as an additional option for batch users, rather than a > >> replacement > >>> for Pipelined or Blocking mode. > >>> > >>> So you mean the hybrid shuffle mode will limit its usage to the bounded > source, Right ? > > >>> Yes. > >>> > >>> One more question, with the bounded data and partly of the stage is > >> running > in the Pipelined shuffle mode, what will be the behavior of the task > failure, Is the checkpoint enabled for these running stages or will it > re-run after the failure? > > >>> There's no checkpoints. The failover behavior depends on the spilling > >>> strategy. > >>> - In the first version, we only consider a selective spilling strategy, > >>> which means spill data as little as possible to the disk, which means > in > >>> case of failover upstream tasks need to be restarted to reproduce the > >>> complete intermediate results. > >>> - An alternative strategy we may introduce in future if needed is to > >> spill > >>> the complete intermediate results. That avoids restarting upstream > tasks > >> in > >>> case of failover, because the produced intermediate results can be > >>> re-consumed, at the cost of more disk IO load. > >>> With both strategies, the trade-off between failover cost and IO load > is > >>> for the user to decide. This is also discussed in the MemoryDataManager > >>> section of the FLIP. > >>> > >>> Best, > >>> > >>> Xintong > >>> > >>> > >>> > >>> On Fri, May 20, 2022 at 12:10 PM Aitozi wrote: > >>> > Thanks Weijie for your answer. So you mean the hybrid shuffle mode > will > limit > its usage to the bounded source, Right ? > One more question, with the bounded data and partly of the stage is > >>> running > in the Pipelined shuffle mode, what will be the behavior of the task > failure, Is the > checkpoint enabled for these running stages or will it re-run after > the > failure? > > Best, > Aitozi > > weijie guo 于2022年5月20日周五 10:45写道: > > > Hi, Aitozi: > > > > Thank you for the feedback! > > Here are some of my thoughts on your question > > > 1.If there is an unbounded data source, but only have resource to > > schedule the first stage, will it bring the big burden to the > disk/shuffle > > service which will occupy all the resource I think. > > First of all, Hybrid Shuffle Mode is oriented to the batch job > >>> scenario, > so > > there is no problem of unbounded data sources. Secondly, if you > >>> consider > > the stream scenario, I think Pipelined Shuffle should still be the > >> best > > choice at present. For an unbounded data stream, it is not meaningful > >>> to > > only run some stages. > > > 2. Which kind of job will benefit from the hybrid shuffle mode. > >
Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode
Will this also allow spilling everything to disk while also forwarding data to the next task? This would allow us to improve fine-grained recovery by no longer being constrained to pipelined regions. On 25/05/2022 05:55, weijie guo wrote: Hi All, Thank you for your attention and feedback. Do you have any other comments? If there are no other questions, I'll vote on FLIP-235 tomorrow. Best regards, Weijie Aitozi 于2022年5月20日周五 13:22写道: Hi Xintong Thanks for your detailed explanation, I misunderstand the spill behavior at first glance, I get your point now. I think it will be a good addition to the current execution mode. Looking forward to it :) Best, Aitozi Xintong Song 于2022年5月20日周五 12:26写道: Hi Aitozi, In which case we can use the hybrid shuffle mode Just to directly answer this question, in addition to Weijie's explanations. For batch workload, if you want the workload to take advantage of as many resources as available, which ranges from a single slot to as many slots as the total tasks, you may consider hybrid shuffle mode. Admittedly, this may not always be wanted, e.g., users may not want to execute a job if there's too few resources available, or may not want a job taking too many of the cluster resources. That's why we propose hybrid shuffle as an additional option for batch users, rather than a replacement for Pipelined or Blocking mode. So you mean the hybrid shuffle mode will limit its usage to the bounded source, Right ? Yes. One more question, with the bounded data and partly of the stage is running in the Pipelined shuffle mode, what will be the behavior of the task failure, Is the checkpoint enabled for these running stages or will it re-run after the failure? There's no checkpoints. The failover behavior depends on the spilling strategy. - In the first version, we only consider a selective spilling strategy, which means spill data as little as possible to the disk, which means in case of failover upstream tasks need to be restarted to reproduce the complete intermediate results. - An alternative strategy we may introduce in future if needed is to spill the complete intermediate results. That avoids restarting upstream tasks in case of failover, because the produced intermediate results can be re-consumed, at the cost of more disk IO load. With both strategies, the trade-off between failover cost and IO load is for the user to decide. This is also discussed in the MemoryDataManager section of the FLIP. Best, Xintong On Fri, May 20, 2022 at 12:10 PM Aitozi wrote: Thanks Weijie for your answer. So you mean the hybrid shuffle mode will limit its usage to the bounded source, Right ? One more question, with the bounded data and partly of the stage is running in the Pipelined shuffle mode, what will be the behavior of the task failure, Is the checkpoint enabled for these running stages or will it re-run after the failure? Best, Aitozi weijie guo 于2022年5月20日周五 10:45写道: Hi, Aitozi: Thank you for the feedback! Here are some of my thoughts on your question 1.If there is an unbounded data source, but only have resource to schedule the first stage, will it bring the big burden to the disk/shuffle service which will occupy all the resource I think. First of all, Hybrid Shuffle Mode is oriented to the batch job scenario, so there is no problem of unbounded data sources. Secondly, if you consider the stream scenario, I think Pipelined Shuffle should still be the best choice at present. For an unbounded data stream, it is not meaningful to only run some stages. 2. Which kind of job will benefit from the hybrid shuffle mode. In other words, In which case we can use the hybrid shuffle mode: Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid shuffle mode can effectively utilize cluster resources and avoid some unnecessary disk IO overhead. For OLAP scenarios, which are characterized by a large number of concurrently submitted short batch jobs, hybrid shuffle can solve the scheduling deadlock problem of pipelined shuffle and achieve similar performance. Best regards, Weijie Aitozi 于2022年5月20日周五 08:05写道: Hi Weijie: Thanks for the nice FLIP, I have couple questions about this: 1) In the hybrid shuffle mode, the shuffle mode is decided by the resource. If there is an unbounded data source, but only have resource to schedule the first stage, will it bring the big burden to the disk/shuffle service which will occupy all the resource I think. 2) Which kind of job will benefit from the hybrid shuffle mode. In other words, In which case we can use the hybrid shuffle mode: - For batch job want to use more resource to reduce the e2e time ? - Or for streaming job which may lack of resource temporarily ? - Or for OLAP job which will try to make best use of available resources as you mentioned to finish the query? Just want to know the typical use case for the Hybrid shuffle mode :) Best, Ai
Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode
Hi All, Thank you for your attention and feedback. Do you have any other comments? If there are no other questions, I'll vote on FLIP-235 tomorrow. Best regards, Weijie Aitozi 于2022年5月20日周五 13:22写道: > Hi Xintong > Thanks for your detailed explanation, I misunderstand the spill > behavior at first glance, > I get your point now. I think it will be a good addition to the current > execution mode. > Looking forward to it :) > > Best, > Aitozi > > Xintong Song 于2022年5月20日周五 12:26写道: > > > Hi Aitozi, > > > > In which case we can use the hybrid shuffle mode > > > > Just to directly answer this question, in addition to > > Weijie's explanations. For batch workload, if you want the workload to > take > > advantage of as many resources as available, which ranges from a single > > slot to as many slots as the total tasks, you may consider hybrid shuffle > > mode. Admittedly, this may not always be wanted, e.g., users may not want > > to execute a job if there's too few resources available, or may not want > a > > job taking too many of the cluster resources. That's why we propose > hybrid > > shuffle as an additional option for batch users, rather than a > replacement > > for Pipelined or Blocking mode. > > > > So you mean the hybrid shuffle mode will limit its usage to the bounded > > > source, Right ? > > > > > Yes. > > > > One more question, with the bounded data and partly of the stage is > running > > > in the Pipelined shuffle mode, what will be the behavior of the task > > > failure, Is the checkpoint enabled for these running stages or will it > > > re-run after the failure? > > > > > There's no checkpoints. The failover behavior depends on the spilling > > strategy. > > - In the first version, we only consider a selective spilling strategy, > > which means spill data as little as possible to the disk, which means in > > case of failover upstream tasks need to be restarted to reproduce the > > complete intermediate results. > > - An alternative strategy we may introduce in future if needed is to > spill > > the complete intermediate results. That avoids restarting upstream tasks > in > > case of failover, because the produced intermediate results can be > > re-consumed, at the cost of more disk IO load. > > With both strategies, the trade-off between failover cost and IO load is > > for the user to decide. This is also discussed in the MemoryDataManager > > section of the FLIP. > > > > Best, > > > > Xintong > > > > > > > > On Fri, May 20, 2022 at 12:10 PM Aitozi wrote: > > > > > Thanks Weijie for your answer. So you mean the hybrid shuffle mode will > > > limit > > > its usage to the bounded source, Right ? > > > One more question, with the bounded data and partly of the stage is > > running > > > in the Pipelined shuffle mode, what will be the behavior of the task > > > failure, Is the > > > checkpoint enabled for these running stages or will it re-run after the > > > failure? > > > > > > Best, > > > Aitozi > > > > > > weijie guo 于2022年5月20日周五 10:45写道: > > > > > > > Hi, Aitozi: > > > > > > > > Thank you for the feedback! > > > > Here are some of my thoughts on your question > > > > > > > > >>> 1.If there is an unbounded data source, but only have resource to > > > > schedule the first stage, will it bring the big burden to the > > > disk/shuffle > > > > service which will occupy all the resource I think. > > > > First of all, Hybrid Shuffle Mode is oriented to the batch job > > scenario, > > > so > > > > there is no problem of unbounded data sources. Secondly, if you > > consider > > > > the stream scenario, I think Pipelined Shuffle should still be the > best > > > > choice at present. For an unbounded data stream, it is not meaningful > > to > > > > only run some stages. > > > > > > > > >>> 2. Which kind of job will benefit from the hybrid shuffle mode. > In > > > > other words, In which case we can use the hybrid shuffle mode: > > > > Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid > > > > shuffle mode can effectively utilize cluster resources and avoid some > > > > unnecessary disk IO overhead. For OLAP scenarios, which are > > characterized > > > > by a large number of concurrently submitted short batch jobs, hybrid > > > > shuffle can solve the scheduling deadlock problem of pipelined > shuffle > > > and > > > > achieve similar performance. > > > > > > > > Best regards, > > > > > > > > Weijie > > > > > > > > > > > > Aitozi 于2022年5月20日周五 08:05写道: > > > > > > > > > Hi Weijie: > > > > > > > > > > Thanks for the nice FLIP, I have couple questions about this: > > > > > > > > > > 1) In the hybrid shuffle mode, the shuffle mode is decided by the > > > > resource. > > > > > If there > > > > > is an unbounded data source, but only have resource to schedule the > > > first > > > > > stage, will it > > > > > bring the big burden to the disk/shuffle service which will occupy > > all > > > > the > > > > > resource I think. > > > > > > > > > > 2) Which kind of job will benef
Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode
Hi Xintong Thanks for your detailed explanation, I misunderstand the spill behavior at first glance, I get your point now. I think it will be a good addition to the current execution mode. Looking forward to it :) Best, Aitozi Xintong Song 于2022年5月20日周五 12:26写道: > Hi Aitozi, > > In which case we can use the hybrid shuffle mode > > Just to directly answer this question, in addition to > Weijie's explanations. For batch workload, if you want the workload to take > advantage of as many resources as available, which ranges from a single > slot to as many slots as the total tasks, you may consider hybrid shuffle > mode. Admittedly, this may not always be wanted, e.g., users may not want > to execute a job if there's too few resources available, or may not want a > job taking too many of the cluster resources. That's why we propose hybrid > shuffle as an additional option for batch users, rather than a replacement > for Pipelined or Blocking mode. > > So you mean the hybrid shuffle mode will limit its usage to the bounded > > source, Right ? > > > Yes. > > One more question, with the bounded data and partly of the stage is running > > in the Pipelined shuffle mode, what will be the behavior of the task > > failure, Is the checkpoint enabled for these running stages or will it > > re-run after the failure? > > > There's no checkpoints. The failover behavior depends on the spilling > strategy. > - In the first version, we only consider a selective spilling strategy, > which means spill data as little as possible to the disk, which means in > case of failover upstream tasks need to be restarted to reproduce the > complete intermediate results. > - An alternative strategy we may introduce in future if needed is to spill > the complete intermediate results. That avoids restarting upstream tasks in > case of failover, because the produced intermediate results can be > re-consumed, at the cost of more disk IO load. > With both strategies, the trade-off between failover cost and IO load is > for the user to decide. This is also discussed in the MemoryDataManager > section of the FLIP. > > Best, > > Xintong > > > > On Fri, May 20, 2022 at 12:10 PM Aitozi wrote: > > > Thanks Weijie for your answer. So you mean the hybrid shuffle mode will > > limit > > its usage to the bounded source, Right ? > > One more question, with the bounded data and partly of the stage is > running > > in the Pipelined shuffle mode, what will be the behavior of the task > > failure, Is the > > checkpoint enabled for these running stages or will it re-run after the > > failure? > > > > Best, > > Aitozi > > > > weijie guo 于2022年5月20日周五 10:45写道: > > > > > Hi, Aitozi: > > > > > > Thank you for the feedback! > > > Here are some of my thoughts on your question > > > > > > >>> 1.If there is an unbounded data source, but only have resource to > > > schedule the first stage, will it bring the big burden to the > > disk/shuffle > > > service which will occupy all the resource I think. > > > First of all, Hybrid Shuffle Mode is oriented to the batch job > scenario, > > so > > > there is no problem of unbounded data sources. Secondly, if you > consider > > > the stream scenario, I think Pipelined Shuffle should still be the best > > > choice at present. For an unbounded data stream, it is not meaningful > to > > > only run some stages. > > > > > > >>> 2. Which kind of job will benefit from the hybrid shuffle mode. In > > > other words, In which case we can use the hybrid shuffle mode: > > > Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid > > > shuffle mode can effectively utilize cluster resources and avoid some > > > unnecessary disk IO overhead. For OLAP scenarios, which are > characterized > > > by a large number of concurrently submitted short batch jobs, hybrid > > > shuffle can solve the scheduling deadlock problem of pipelined shuffle > > and > > > achieve similar performance. > > > > > > Best regards, > > > > > > Weijie > > > > > > > > > Aitozi 于2022年5月20日周五 08:05写道: > > > > > > > Hi Weijie: > > > > > > > > Thanks for the nice FLIP, I have couple questions about this: > > > > > > > > 1) In the hybrid shuffle mode, the shuffle mode is decided by the > > > resource. > > > > If there > > > > is an unbounded data source, but only have resource to schedule the > > first > > > > stage, will it > > > > bring the big burden to the disk/shuffle service which will occupy > all > > > the > > > > resource I think. > > > > > > > > 2) Which kind of job will benefit from the hybrid shuffle mode. In > > other > > > > words, In which > > > > case we can use the hybrid shuffle mode: > > > > - For batch job want to use more resource to reduce the e2e time ? > > > > - Or for streaming job which may lack of resource temporarily ? > > > > - Or for OLAP job which will try to make best use of available > > resources > > > as > > > > you mentioned to finish the query? > > > > Just want to know the typical use case for the Hybrid shuffle m
Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode
Hi Aitozi, In which case we can use the hybrid shuffle mode Just to directly answer this question, in addition to Weijie's explanations. For batch workload, if you want the workload to take advantage of as many resources as available, which ranges from a single slot to as many slots as the total tasks, you may consider hybrid shuffle mode. Admittedly, this may not always be wanted, e.g., users may not want to execute a job if there's too few resources available, or may not want a job taking too many of the cluster resources. That's why we propose hybrid shuffle as an additional option for batch users, rather than a replacement for Pipelined or Blocking mode. So you mean the hybrid shuffle mode will limit its usage to the bounded > source, Right ? > Yes. One more question, with the bounded data and partly of the stage is running > in the Pipelined shuffle mode, what will be the behavior of the task > failure, Is the checkpoint enabled for these running stages or will it > re-run after the failure? > There's no checkpoints. The failover behavior depends on the spilling strategy. - In the first version, we only consider a selective spilling strategy, which means spill data as little as possible to the disk, which means in case of failover upstream tasks need to be restarted to reproduce the complete intermediate results. - An alternative strategy we may introduce in future if needed is to spill the complete intermediate results. That avoids restarting upstream tasks in case of failover, because the produced intermediate results can be re-consumed, at the cost of more disk IO load. With both strategies, the trade-off between failover cost and IO load is for the user to decide. This is also discussed in the MemoryDataManager section of the FLIP. Best, Xintong On Fri, May 20, 2022 at 12:10 PM Aitozi wrote: > Thanks Weijie for your answer. So you mean the hybrid shuffle mode will > limit > its usage to the bounded source, Right ? > One more question, with the bounded data and partly of the stage is running > in the Pipelined shuffle mode, what will be the behavior of the task > failure, Is the > checkpoint enabled for these running stages or will it re-run after the > failure? > > Best, > Aitozi > > weijie guo 于2022年5月20日周五 10:45写道: > > > Hi, Aitozi: > > > > Thank you for the feedback! > > Here are some of my thoughts on your question > > > > >>> 1.If there is an unbounded data source, but only have resource to > > schedule the first stage, will it bring the big burden to the > disk/shuffle > > service which will occupy all the resource I think. > > First of all, Hybrid Shuffle Mode is oriented to the batch job scenario, > so > > there is no problem of unbounded data sources. Secondly, if you consider > > the stream scenario, I think Pipelined Shuffle should still be the best > > choice at present. For an unbounded data stream, it is not meaningful to > > only run some stages. > > > > >>> 2. Which kind of job will benefit from the hybrid shuffle mode. In > > other words, In which case we can use the hybrid shuffle mode: > > Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid > > shuffle mode can effectively utilize cluster resources and avoid some > > unnecessary disk IO overhead. For OLAP scenarios, which are characterized > > by a large number of concurrently submitted short batch jobs, hybrid > > shuffle can solve the scheduling deadlock problem of pipelined shuffle > and > > achieve similar performance. > > > > Best regards, > > > > Weijie > > > > > > Aitozi 于2022年5月20日周五 08:05写道: > > > > > Hi Weijie: > > > > > > Thanks for the nice FLIP, I have couple questions about this: > > > > > > 1) In the hybrid shuffle mode, the shuffle mode is decided by the > > resource. > > > If there > > > is an unbounded data source, but only have resource to schedule the > first > > > stage, will it > > > bring the big burden to the disk/shuffle service which will occupy all > > the > > > resource I think. > > > > > > 2) Which kind of job will benefit from the hybrid shuffle mode. In > other > > > words, In which > > > case we can use the hybrid shuffle mode: > > > - For batch job want to use more resource to reduce the e2e time ? > > > - Or for streaming job which may lack of resource temporarily ? > > > - Or for OLAP job which will try to make best use of available > resources > > as > > > you mentioned to finish the query? > > > Just want to know the typical use case for the Hybrid shuffle mode :) > > > > > > > > > Best, > > > Aitozi. > > > > > > weijie guo 于2022年5月19日周四 18:33写道: > > > > > > > Yangze, Thank you for the feedback! > > > > Here's my thoughts for your questions: > > > > > > > > >>> How do we decide the size of the buffer pool in MemoryDataManager > > and > > > > the read buffers in FileDataManager? > > > > The BufferPool in MemoryDataManager is the LocalBufferPool used by > > > > ResultPartition, and the size is the same as the current > implementation > > > of > > > > sort-me
Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode
Thanks Weijie for your answer. So you mean the hybrid shuffle mode will limit its usage to the bounded source, Right ? One more question, with the bounded data and partly of the stage is running in the Pipelined shuffle mode, what will be the behavior of the task failure, Is the checkpoint enabled for these running stages or will it re-run after the failure? Best, Aitozi weijie guo 于2022年5月20日周五 10:45写道: > Hi, Aitozi: > > Thank you for the feedback! > Here are some of my thoughts on your question > > >>> 1.If there is an unbounded data source, but only have resource to > schedule the first stage, will it bring the big burden to the disk/shuffle > service which will occupy all the resource I think. > First of all, Hybrid Shuffle Mode is oriented to the batch job scenario, so > there is no problem of unbounded data sources. Secondly, if you consider > the stream scenario, I think Pipelined Shuffle should still be the best > choice at present. For an unbounded data stream, it is not meaningful to > only run some stages. > > >>> 2. Which kind of job will benefit from the hybrid shuffle mode. In > other words, In which case we can use the hybrid shuffle mode: > Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid > shuffle mode can effectively utilize cluster resources and avoid some > unnecessary disk IO overhead. For OLAP scenarios, which are characterized > by a large number of concurrently submitted short batch jobs, hybrid > shuffle can solve the scheduling deadlock problem of pipelined shuffle and > achieve similar performance. > > Best regards, > > Weijie > > > Aitozi 于2022年5月20日周五 08:05写道: > > > Hi Weijie: > > > > Thanks for the nice FLIP, I have couple questions about this: > > > > 1) In the hybrid shuffle mode, the shuffle mode is decided by the > resource. > > If there > > is an unbounded data source, but only have resource to schedule the first > > stage, will it > > bring the big burden to the disk/shuffle service which will occupy all > the > > resource I think. > > > > 2) Which kind of job will benefit from the hybrid shuffle mode. In other > > words, In which > > case we can use the hybrid shuffle mode: > > - For batch job want to use more resource to reduce the e2e time ? > > - Or for streaming job which may lack of resource temporarily ? > > - Or for OLAP job which will try to make best use of available resources > as > > you mentioned to finish the query? > > Just want to know the typical use case for the Hybrid shuffle mode :) > > > > > > Best, > > Aitozi. > > > > weijie guo 于2022年5月19日周四 18:33写道: > > > > > Yangze, Thank you for the feedback! > > > Here's my thoughts for your questions: > > > > > > >>> How do we decide the size of the buffer pool in MemoryDataManager > and > > > the read buffers in FileDataManager? > > > The BufferPool in MemoryDataManager is the LocalBufferPool used by > > > ResultPartition, and the size is the same as the current implementation > > of > > > sort-merge shuffle. In other words, the minimum value of BufferPool is > a > > > configurable fixed value, and the maximum value is Math.max(min, 4 * > > > numSubpartitions). The default value can be determined by running the > > > TPC-DS tests. > > > Read buffers in FileDataManager are requested from the > > > BatchShuffleReadBufferPool shared by TaskManager, it's size controlled > by > > > *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the default > > > value is 32M, which is consistent with the current sort-merge shuffle > > > logic. > > > > > > >>> Is there an upper limit for the sum of them? If there is, how does > > > MemoryDataManager and FileDataManager sync the memory usage? > > > The buffers of the MemoryDataManager are limited by the size of the > > > LocalBufferPool, and the upper limit is the size of the Network Memory. > > The > > > buffers of the FileDataManager are directly requested from > > > UnpooledOffHeapMemory, and are also limited by the size of the > framework > > > off-heap memory. I think there should be no need for additional > > > synchronization mechanisms. > > > > > > >>> How do you disable the slot sharing? If user configures both the > slot > > > sharing group and hybrid shuffle, what will happen to that job? > > > I think we can print a warning log when Hybrid Shuffle is enabled and > SSG > > > is configured during the JobGraph compilation stage, and fallback to > the > > > region slot sharing group by default. Of course, it will be emphasized > in > > > the document that we do not currently support SSG, If configured, it > will > > > fall back to the default. > > > > > > > > > Best regards, > > > > > > Weijie > > > > > > > > > Yangze Guo 于2022年5月19日周四 16:25写道: > > > > > > > Thanks for driving this. Xintong and Weijie. > > > > > > > > I believe this feature will make Flink a better batch/OLAP engine. +1 > > > > for the overall design. > > > > > > > > Some questions: > > > > 1. How do we decide the size of the buffer pool in MemoryDataManager > >
Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode
Hi, Aitozi: Thank you for the feedback! Here are some of my thoughts on your question >>> 1.If there is an unbounded data source, but only have resource to schedule the first stage, will it bring the big burden to the disk/shuffle service which will occupy all the resource I think. First of all, Hybrid Shuffle Mode is oriented to the batch job scenario, so there is no problem of unbounded data sources. Secondly, if you consider the stream scenario, I think Pipelined Shuffle should still be the best choice at present. For an unbounded data stream, it is not meaningful to only run some stages. >>> 2. Which kind of job will benefit from the hybrid shuffle mode. In other words, In which case we can use the hybrid shuffle mode: Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid shuffle mode can effectively utilize cluster resources and avoid some unnecessary disk IO overhead. For OLAP scenarios, which are characterized by a large number of concurrently submitted short batch jobs, hybrid shuffle can solve the scheduling deadlock problem of pipelined shuffle and achieve similar performance. Best regards, Weijie Aitozi 于2022年5月20日周五 08:05写道: > Hi Weijie: > > Thanks for the nice FLIP, I have couple questions about this: > > 1) In the hybrid shuffle mode, the shuffle mode is decided by the resource. > If there > is an unbounded data source, but only have resource to schedule the first > stage, will it > bring the big burden to the disk/shuffle service which will occupy all the > resource I think. > > 2) Which kind of job will benefit from the hybrid shuffle mode. In other > words, In which > case we can use the hybrid shuffle mode: > - For batch job want to use more resource to reduce the e2e time ? > - Or for streaming job which may lack of resource temporarily ? > - Or for OLAP job which will try to make best use of available resources as > you mentioned to finish the query? > Just want to know the typical use case for the Hybrid shuffle mode :) > > > Best, > Aitozi. > > weijie guo 于2022年5月19日周四 18:33写道: > > > Yangze, Thank you for the feedback! > > Here's my thoughts for your questions: > > > > >>> How do we decide the size of the buffer pool in MemoryDataManager and > > the read buffers in FileDataManager? > > The BufferPool in MemoryDataManager is the LocalBufferPool used by > > ResultPartition, and the size is the same as the current implementation > of > > sort-merge shuffle. In other words, the minimum value of BufferPool is a > > configurable fixed value, and the maximum value is Math.max(min, 4 * > > numSubpartitions). The default value can be determined by running the > > TPC-DS tests. > > Read buffers in FileDataManager are requested from the > > BatchShuffleReadBufferPool shared by TaskManager, it's size controlled by > > *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the default > > value is 32M, which is consistent with the current sort-merge shuffle > > logic. > > > > >>> Is there an upper limit for the sum of them? If there is, how does > > MemoryDataManager and FileDataManager sync the memory usage? > > The buffers of the MemoryDataManager are limited by the size of the > > LocalBufferPool, and the upper limit is the size of the Network Memory. > The > > buffers of the FileDataManager are directly requested from > > UnpooledOffHeapMemory, and are also limited by the size of the framework > > off-heap memory. I think there should be no need for additional > > synchronization mechanisms. > > > > >>> How do you disable the slot sharing? If user configures both the slot > > sharing group and hybrid shuffle, what will happen to that job? > > I think we can print a warning log when Hybrid Shuffle is enabled and SSG > > is configured during the JobGraph compilation stage, and fallback to the > > region slot sharing group by default. Of course, it will be emphasized in > > the document that we do not currently support SSG, If configured, it will > > fall back to the default. > > > > > > Best regards, > > > > Weijie > > > > > > Yangze Guo 于2022年5月19日周四 16:25写道: > > > > > Thanks for driving this. Xintong and Weijie. > > > > > > I believe this feature will make Flink a better batch/OLAP engine. +1 > > > for the overall design. > > > > > > Some questions: > > > 1. How do we decide the size of the buffer pool in MemoryDataManager > > > and the read buffers in FileDataManager? > > > 2. Is there an upper limit for the sum of them? If there is, how does > > > MemoryDataManager and FileDataManager sync the memory usage? > > > 3. How do you disable the slot sharing? If user configures both the > > > slot sharing group and hybrid shuffle, what will happen to that job? > > > > > > > > > Best, > > > Yangze Guo > > > > > > On Thu, May 19, 2022 at 2:41 PM Xintong Song > > > wrote: > > > > > > > > Thanks for preparing this FLIP, Weijie. > > > > > > > > I think this is a good improvement on batch resource elasticity. > > Looking > > > > forward to the community feedback. > >
Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode
Hi Weijie: Thanks for the nice FLIP, I have couple questions about this: 1) In the hybrid shuffle mode, the shuffle mode is decided by the resource. If there is an unbounded data source, but only have resource to schedule the first stage, will it bring the big burden to the disk/shuffle service which will occupy all the resource I think. 2) Which kind of job will benefit from the hybrid shuffle mode. In other words, In which case we can use the hybrid shuffle mode: - For batch job want to use more resource to reduce the e2e time ? - Or for streaming job which may lack of resource temporarily ? - Or for OLAP job which will try to make best use of available resources as you mentioned to finish the query? Just want to know the typical use case for the Hybrid shuffle mode :) Best, Aitozi. weijie guo 于2022年5月19日周四 18:33写道: > Yangze, Thank you for the feedback! > Here's my thoughts for your questions: > > >>> How do we decide the size of the buffer pool in MemoryDataManager and > the read buffers in FileDataManager? > The BufferPool in MemoryDataManager is the LocalBufferPool used by > ResultPartition, and the size is the same as the current implementation of > sort-merge shuffle. In other words, the minimum value of BufferPool is a > configurable fixed value, and the maximum value is Math.max(min, 4 * > numSubpartitions). The default value can be determined by running the > TPC-DS tests. > Read buffers in FileDataManager are requested from the > BatchShuffleReadBufferPool shared by TaskManager, it's size controlled by > *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the default > value is 32M, which is consistent with the current sort-merge shuffle > logic. > > >>> Is there an upper limit for the sum of them? If there is, how does > MemoryDataManager and FileDataManager sync the memory usage? > The buffers of the MemoryDataManager are limited by the size of the > LocalBufferPool, and the upper limit is the size of the Network Memory. The > buffers of the FileDataManager are directly requested from > UnpooledOffHeapMemory, and are also limited by the size of the framework > off-heap memory. I think there should be no need for additional > synchronization mechanisms. > > >>> How do you disable the slot sharing? If user configures both the slot > sharing group and hybrid shuffle, what will happen to that job? > I think we can print a warning log when Hybrid Shuffle is enabled and SSG > is configured during the JobGraph compilation stage, and fallback to the > region slot sharing group by default. Of course, it will be emphasized in > the document that we do not currently support SSG, If configured, it will > fall back to the default. > > > Best regards, > > Weijie > > > Yangze Guo 于2022年5月19日周四 16:25写道: > > > Thanks for driving this. Xintong and Weijie. > > > > I believe this feature will make Flink a better batch/OLAP engine. +1 > > for the overall design. > > > > Some questions: > > 1. How do we decide the size of the buffer pool in MemoryDataManager > > and the read buffers in FileDataManager? > > 2. Is there an upper limit for the sum of them? If there is, how does > > MemoryDataManager and FileDataManager sync the memory usage? > > 3. How do you disable the slot sharing? If user configures both the > > slot sharing group and hybrid shuffle, what will happen to that job? > > > > > > Best, > > Yangze Guo > > > > On Thu, May 19, 2022 at 2:41 PM Xintong Song > > wrote: > > > > > > Thanks for preparing this FLIP, Weijie. > > > > > > I think this is a good improvement on batch resource elasticity. > Looking > > > forward to the community feedback. > > > > > > Best, > > > > > > Xintong > > > > > > > > > > > > On Thu, May 19, 2022 at 2:31 PM weijie guo > > > wrote: > > > > > > > Hi all, > > > > > > > > > > > > I’d like to start a discussion about FLIP-235[1], which introduce a > > new shuffle mode > > > > can overcome some of the problems of Pipelined Shuffle and Blocking > > Shuffle in batch scenarios. > > > > > > > > > > > > Currently in Flink, task scheduling is more or less constrained by > the > > shuffle implementations. > > > > This will bring the following disadvantages: > > > > > > > >1. Pipelined Shuffle: > > > > For pipelined shuffle, the upstream and downstream tasks are > > required to be deployed at the same time, to avoid upstream tasks being > > blocked forever. This is fine when there are enough resources for both > > upstream and downstream tasks to run simultaneously, but will cause the > > following problems otherwise: > > > >1. > > > > Pipelined shuffle connected tasks (i.e., a pipelined region) > > cannot be executed until obtaining resources for all of them, resulting > in > > longer job finishing time and poorer resource efficiency due to holding > > part of the resources idle while waiting for the rest. > > > > 2. > > > > More severely, if multiple jobs each hold part of the cluster > > resources and are waiting for more, a deadlock
Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode
Yangze, Thank you for the feedback! Here's my thoughts for your questions: >>> How do we decide the size of the buffer pool in MemoryDataManager and the read buffers in FileDataManager? The BufferPool in MemoryDataManager is the LocalBufferPool used by ResultPartition, and the size is the same as the current implementation of sort-merge shuffle. In other words, the minimum value of BufferPool is a configurable fixed value, and the maximum value is Math.max(min, 4 * numSubpartitions). The default value can be determined by running the TPC-DS tests. Read buffers in FileDataManager are requested from the BatchShuffleReadBufferPool shared by TaskManager, it's size controlled by *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the default value is 32M, which is consistent with the current sort-merge shuffle logic. >>> Is there an upper limit for the sum of them? If there is, how does MemoryDataManager and FileDataManager sync the memory usage? The buffers of the MemoryDataManager are limited by the size of the LocalBufferPool, and the upper limit is the size of the Network Memory. The buffers of the FileDataManager are directly requested from UnpooledOffHeapMemory, and are also limited by the size of the framework off-heap memory. I think there should be no need for additional synchronization mechanisms. >>> How do you disable the slot sharing? If user configures both the slot sharing group and hybrid shuffle, what will happen to that job? I think we can print a warning log when Hybrid Shuffle is enabled and SSG is configured during the JobGraph compilation stage, and fallback to the region slot sharing group by default. Of course, it will be emphasized in the document that we do not currently support SSG, If configured, it will fall back to the default. Best regards, Weijie Yangze Guo 于2022年5月19日周四 16:25写道: > Thanks for driving this. Xintong and Weijie. > > I believe this feature will make Flink a better batch/OLAP engine. +1 > for the overall design. > > Some questions: > 1. How do we decide the size of the buffer pool in MemoryDataManager > and the read buffers in FileDataManager? > 2. Is there an upper limit for the sum of them? If there is, how does > MemoryDataManager and FileDataManager sync the memory usage? > 3. How do you disable the slot sharing? If user configures both the > slot sharing group and hybrid shuffle, what will happen to that job? > > > Best, > Yangze Guo > > On Thu, May 19, 2022 at 2:41 PM Xintong Song > wrote: > > > > Thanks for preparing this FLIP, Weijie. > > > > I think this is a good improvement on batch resource elasticity. Looking > > forward to the community feedback. > > > > Best, > > > > Xintong > > > > > > > > On Thu, May 19, 2022 at 2:31 PM weijie guo > > wrote: > > > > > Hi all, > > > > > > > > > I’d like to start a discussion about FLIP-235[1], which introduce a > new shuffle mode > > > can overcome some of the problems of Pipelined Shuffle and Blocking > Shuffle in batch scenarios. > > > > > > > > > Currently in Flink, task scheduling is more or less constrained by the > shuffle implementations. > > > This will bring the following disadvantages: > > > > > >1. Pipelined Shuffle: > > > For pipelined shuffle, the upstream and downstream tasks are > required to be deployed at the same time, to avoid upstream tasks being > blocked forever. This is fine when there are enough resources for both > upstream and downstream tasks to run simultaneously, but will cause the > following problems otherwise: > > >1. > > > Pipelined shuffle connected tasks (i.e., a pipelined region) > cannot be executed until obtaining resources for all of them, resulting in > longer job finishing time and poorer resource efficiency due to holding > part of the resources idle while waiting for the rest. > > > 2. > > > More severely, if multiple jobs each hold part of the cluster > resources and are waiting for more, a deadlock would occur. The chance is > not trivial, especially for scenarios such as OLAP where concurrent job > submissions are frequent. > > > 2. Blocking Shuffle: > > > For blocking shuffle, execution of downstream tasks must wait for > all upstream tasks to finish, despite there might be more resources > available. The sequential execution of upstream and downstream tasks > significantly increase the job finishing time, and the disk IO workload for > spilling and loading full intermediate data also affects the performance. > > > > > > > > > We believe the root cause of the above problems is that shuffle > implementations put unnecessary constraints on task scheduling. > > > > > > > > > To solve this problem, Xintong Song and I propose to introduce hybrid > shuffle to minimize the scheduling constraints. With Hybrid Shuffle, Flink > should: > > > > > >1. Make best use of available resources. > > > Ideally, we want Flink to always make progress if possible. That > is to say, it should always execute a pending task if the
Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode
Thanks for driving this. Xintong and Weijie. I believe this feature will make Flink a better batch/OLAP engine. +1 for the overall design. Some questions: 1. How do we decide the size of the buffer pool in MemoryDataManager and the read buffers in FileDataManager? 2. Is there an upper limit for the sum of them? If there is, how does MemoryDataManager and FileDataManager sync the memory usage? 3. How do you disable the slot sharing? If user configures both the slot sharing group and hybrid shuffle, what will happen to that job? Best, Yangze Guo On Thu, May 19, 2022 at 2:41 PM Xintong Song wrote: > > Thanks for preparing this FLIP, Weijie. > > I think this is a good improvement on batch resource elasticity. Looking > forward to the community feedback. > > Best, > > Xintong > > > > On Thu, May 19, 2022 at 2:31 PM weijie guo > wrote: > > > Hi all, > > > > > > I’d like to start a discussion about FLIP-235[1], which introduce a new > > shuffle mode > > can overcome some of the problems of Pipelined Shuffle and Blocking > > Shuffle in batch scenarios. > > > > > > Currently in Flink, task scheduling is more or less constrained by the > > shuffle implementations. > > This will bring the following disadvantages: > > > >1. Pipelined Shuffle: > > For pipelined shuffle, the upstream and downstream tasks are required > > to be deployed at the same time, to avoid upstream tasks being blocked > > forever. This is fine when there are enough resources for both upstream and > > downstream tasks to run simultaneously, but will cause the following > > problems otherwise: > >1. > > Pipelined shuffle connected tasks (i.e., a pipelined region) cannot > > be executed until obtaining resources for all of them, resulting in longer > > job finishing time and poorer resource efficiency due to holding part of > > the resources idle while waiting for the rest. > > 2. > > More severely, if multiple jobs each hold part of the cluster > > resources and are waiting for more, a deadlock would occur. The chance is > > not trivial, especially for scenarios such as OLAP where concurrent job > > submissions are frequent. > > 2. Blocking Shuffle: > > For blocking shuffle, execution of downstream tasks must wait for all > > upstream tasks to finish, despite there might be more resources available. > > The sequential execution of upstream and downstream tasks significantly > > increase the job finishing time, and the disk IO workload for spilling and > > loading full intermediate data also affects the performance. > > > > > > We believe the root cause of the above problems is that shuffle > > implementations put unnecessary constraints on task scheduling. > > > > > > To solve this problem, Xintong Song and I propose to introduce hybrid > > shuffle to minimize the scheduling constraints. With Hybrid Shuffle, Flink > > should: > > > >1. Make best use of available resources. > > Ideally, we want Flink to always make progress if possible. That is to > > say, it should always execute a pending task if there are resources > > available for that task. > >2. Minimize disk IO load. > > In-flight data should be consumed directly from memory as much as > > possible. Only data that is not consumed timely should be spilled to disk. > > > > You can find more details in FLIP-235. Looking forward to your feedback. > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode > > > > > > > > Best regards, > > > > Weijie > >
Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode
Thanks for preparing this FLIP, Weijie. I think this is a good improvement on batch resource elasticity. Looking forward to the community feedback. Best, Xintong On Thu, May 19, 2022 at 2:31 PM weijie guo wrote: > Hi all, > > > I’d like to start a discussion about FLIP-235[1], which introduce a new > shuffle mode > can overcome some of the problems of Pipelined Shuffle and Blocking Shuffle > in batch scenarios. > > > Currently in Flink, task scheduling is more or less constrained by the > shuffle implementations. > This will bring the following disadvantages: > >1. Pipelined Shuffle: > For pipelined shuffle, the upstream and downstream tasks are required to > be deployed at the same time, to avoid upstream tasks being blocked forever. > This is fine when there are enough resources for both upstream and downstream > tasks to run simultaneously, but will cause the following problems otherwise: >1. > Pipelined shuffle connected tasks (i.e., a pipelined region) cannot be > executed until obtaining resources for all of them, resulting in longer job > finishing time and poorer resource efficiency due to holding part of the > resources idle while waiting for the rest. > 2. > More severely, if multiple jobs each hold part of the cluster resources > and are waiting for more, a deadlock would occur. The chance is not trivial, > especially for scenarios such as OLAP where concurrent job submissions are > frequent. > 2. Blocking Shuffle: > For blocking shuffle, execution of downstream tasks must wait for all > upstream tasks to finish, despite there might be more resources available. > The sequential execution of upstream and downstream tasks significantly > increase the job finishing time, and the disk IO workload for spilling and > loading full intermediate data also affects the performance. > > > We believe the root cause of the above problems is that shuffle > implementations put unnecessary constraints on task scheduling. > > > To solve this problem, Xintong Song and I propose to introduce hybrid shuffle > to minimize the scheduling constraints. With Hybrid Shuffle, Flink should: > >1. Make best use of available resources. > Ideally, we want Flink to always make progress if possible. That is to > say, it should always execute a pending task if there are resources available > for that task. >2. Minimize disk IO load. > In-flight data should be consumed directly from memory as much as > possible. Only data that is not consumed timely should be spilled to disk. > > You can find more details in FLIP-235. Looking forward to your feedback. > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode > > > > Best regards, > > Weijie >
