Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-25 Thread weijie guo
Hi All,

Thanks for all the feedback about this FLIP.

Since there are no other concerns, this FLIP-235 discussion is over.  I will
open a vote today.

Best regards,

Weijie


Xintong Song  于2022年5月25日周三 22:17写道:

> Ok, I think we are on the same page. I'm aware of
> ExecutionConfig#setExecutionMode, which sets the data exchanging mode at
> the job scope.
>
> Best,
>
> Xintong
>
>
>
> On Wed, May 25, 2022 at 9:50 PM Chesnay Schepler 
> wrote:
>
> > You can influence it to some extent via ExecutionConfig#setExecutionMode.
> > You can for example for all shuffles to use blocking exchanges.
> >
> > I'm not proposing an API that would allow this to be set per edge.
> >
> > On 25/05/2022 15:23, Xintong Song wrote:
> >
> > In general, I agree with you about aiming jobs with no/few blocking
> > exchanges for fine-grained recovery. The only problem is, correct me if
> I'm
> > wrong, users currently cannot control the data exchanging mode of a
> > specific edge. I'm not aware of such APIs.
> >
> > As a first step, I'd prefer excluding this from the scope of this FLIP.
> >
> > Best,
> >
> > Xintong
> >
> >
> > On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler 
> > wrote:
> >
> >> Yes; but that's also a limitation of the current fine-grained recovery.
> >>
> >> My suggestion was primarily aimed at jobs that have no/few blocking
> >> exchanges, where users would currently have to explicitly configure
> >> additional blocking exchanges to really get something out of
> >> fine-grained recovery (at the expense of e2e job duration).
> >>
> >> On 25/05/2022 14:47, Xintong Song wrote:
> >> >> Will this also allow spilling everything to disk while also
> forwarding
> >> >> data to the next task?
> >> >>
> >> > Yes, as long as the downstream task is started, this always forward
> the
> >> > data, even while spilling everything.
> >> >
> >> > This would allow us to improve fine-grained recovery by no longer
> being
> >> >> constrained to pipelined regions.
> >> >
> >> > I think it helps preventing restarts of the upstreams for a failed
> task,
> >> > but not the downstreams. Because there's no guarantee a restarted task
> >> will
> >> > prevent exactly same data (in terms of order) as the previous
> execution,
> >> > thus downstreams cannot resume consuming the data.
> >> >
> >> >
> >> > Best,
> >> >
> >> > Xintong
> >> >
> >> >
> >> >
> >> > On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler 
> >> wrote:
> >> >
> >> >> Will this also allow spilling everything to disk while also
> forwarding
> >> >> data to the next task?
> >> >>
> >> >> This would allow us to improve fine-grained recovery by no longer
> being
> >> >> constrained to pipelined regions.
> >> >>
> >> >> On 25/05/2022 05:55, weijie guo wrote:
> >> >>> Hi All,
> >> >>> Thank you for your attention and feedback.
> >> >>> Do you have any other comments? If there are no other questions,
> I'll
> >> >> vote
> >> >>> on FLIP-235 tomorrow.
> >> >>>
> >> >>> Best regards,
> >> >>>
> >> >>> Weijie
> >> >>>
> >> >>>
> >> >>> Aitozi  于2022年5月20日周五 13:22写道:
> >> >>>
> >>  Hi Xintong
> >>    Thanks for your detailed explanation, I misunderstand the
> spill
> >>  behavior at first glance,
> >>  I get your point now. I think it will be a good addition to the
> >> current
> >>  execution mode.
> >>  Looking forward to it :)
> >> 
> >>  Best,
> >>  Aitozi
> >> 
> >>  Xintong Song  于2022年5月20日周五 12:26写道:
> >> 
> >> > Hi Aitozi,
> >> >
> >> > In which case we can use the hybrid shuffle mode
> >> >
> >> > Just to directly answer this question, in addition to
> >> > Weijie's explanations. For batch workload, if you want the
> workload
> >> to
> >>  take
> >> > advantage of as many resources as available, which ranges from a
> >> single
> >> > slot to as many slots as the total tasks, you may consider hybrid
> >> >> shuffle
> >> > mode. Admittedly, this may not always be wanted, e.g., users may
> not
> >> >> want
> >> > to execute a job if there's too few resources available, or may
> not
> >> >> want
> >>  a
> >> > job taking too many of the cluster resources. That's why we
> propose
> >>  hybrid
> >> > shuffle as an additional option for batch users, rather than a
> >>  replacement
> >> > for Pipelined or Blocking mode.
> >> >
> >> > So you mean the hybrid shuffle mode will limit its usage to the
> >> bounded
> >> >> source, Right ?
> >> >>
> >> > Yes.
> >> >
> >> > One more question, with the bounded data and partly of the stage
> is
> >>  running
> >> >> in the Pipelined shuffle mode, what will be the behavior of the
> >> task
> >> >> failure, Is the checkpoint enabled for these running stages or
> >> will it
> >> >> re-run after the failure?
> >> >>
> >> > There's no checkpoints. The failover behavior depends on the
> >> spilling
> >> > strategy.
> >> > - In the first version, we only consider a selective s

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-25 Thread Xintong Song
Ok, I think we are on the same page. I'm aware of
ExecutionConfig#setExecutionMode, which sets the data exchanging mode at
the job scope.

Best,

Xintong



On Wed, May 25, 2022 at 9:50 PM Chesnay Schepler  wrote:

> You can influence it to some extent via ExecutionConfig#setExecutionMode.
> You can for example for all shuffles to use blocking exchanges.
>
> I'm not proposing an API that would allow this to be set per edge.
>
> On 25/05/2022 15:23, Xintong Song wrote:
>
> In general, I agree with you about aiming jobs with no/few blocking
> exchanges for fine-grained recovery. The only problem is, correct me if I'm
> wrong, users currently cannot control the data exchanging mode of a
> specific edge. I'm not aware of such APIs.
>
> As a first step, I'd prefer excluding this from the scope of this FLIP.
>
> Best,
>
> Xintong
>
>
> On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler 
> wrote:
>
>> Yes; but that's also a limitation of the current fine-grained recovery.
>>
>> My suggestion was primarily aimed at jobs that have no/few blocking
>> exchanges, where users would currently have to explicitly configure
>> additional blocking exchanges to really get something out of
>> fine-grained recovery (at the expense of e2e job duration).
>>
>> On 25/05/2022 14:47, Xintong Song wrote:
>> >> Will this also allow spilling everything to disk while also forwarding
>> >> data to the next task?
>> >>
>> > Yes, as long as the downstream task is started, this always forward the
>> > data, even while spilling everything.
>> >
>> > This would allow us to improve fine-grained recovery by no longer being
>> >> constrained to pipelined regions.
>> >
>> > I think it helps preventing restarts of the upstreams for a failed task,
>> > but not the downstreams. Because there's no guarantee a restarted task
>> will
>> > prevent exactly same data (in terms of order) as the previous execution,
>> > thus downstreams cannot resume consuming the data.
>> >
>> >
>> > Best,
>> >
>> > Xintong
>> >
>> >
>> >
>> > On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler 
>> wrote:
>> >
>> >> Will this also allow spilling everything to disk while also forwarding
>> >> data to the next task?
>> >>
>> >> This would allow us to improve fine-grained recovery by no longer being
>> >> constrained to pipelined regions.
>> >>
>> >> On 25/05/2022 05:55, weijie guo wrote:
>> >>> Hi All,
>> >>> Thank you for your attention and feedback.
>> >>> Do you have any other comments? If there are no other questions, I'll
>> >> vote
>> >>> on FLIP-235 tomorrow.
>> >>>
>> >>> Best regards,
>> >>>
>> >>> Weijie
>> >>>
>> >>>
>> >>> Aitozi  于2022年5月20日周五 13:22写道:
>> >>>
>>  Hi Xintong
>>    Thanks for your detailed explanation, I misunderstand the spill
>>  behavior at first glance,
>>  I get your point now. I think it will be a good addition to the
>> current
>>  execution mode.
>>  Looking forward to it :)
>> 
>>  Best,
>>  Aitozi
>> 
>>  Xintong Song  于2022年5月20日周五 12:26写道:
>> 
>> > Hi Aitozi,
>> >
>> > In which case we can use the hybrid shuffle mode
>> >
>> > Just to directly answer this question, in addition to
>> > Weijie's explanations. For batch workload, if you want the workload
>> to
>>  take
>> > advantage of as many resources as available, which ranges from a
>> single
>> > slot to as many slots as the total tasks, you may consider hybrid
>> >> shuffle
>> > mode. Admittedly, this may not always be wanted, e.g., users may not
>> >> want
>> > to execute a job if there's too few resources available, or may not
>> >> want
>>  a
>> > job taking too many of the cluster resources. That's why we propose
>>  hybrid
>> > shuffle as an additional option for batch users, rather than a
>>  replacement
>> > for Pipelined or Blocking mode.
>> >
>> > So you mean the hybrid shuffle mode will limit its usage to the
>> bounded
>> >> source, Right ?
>> >>
>> > Yes.
>> >
>> > One more question, with the bounded data and partly of the stage is
>>  running
>> >> in the Pipelined shuffle mode, what will be the behavior of the
>> task
>> >> failure, Is the checkpoint enabled for these running stages or
>> will it
>> >> re-run after the failure?
>> >>
>> > There's no checkpoints. The failover behavior depends on the
>> spilling
>> > strategy.
>> > - In the first version, we only consider a selective spilling
>> strategy,
>> > which means spill data as little as possible to the disk, which
>> means
>> >> in
>> > case of failover upstream tasks need to be restarted to reproduce
>> the
>> > complete intermediate results.
>> > - An alternative strategy we may introduce in future if needed is to
>>  spill
>> > the complete intermediate results. That avoids restarting upstream
>> >> tasks
>>  in
>> > case of failover, because the produced intermediate results can be
>> > re-consumed,

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-25 Thread Chesnay Schepler

You can influence it to some extent via ExecutionConfig#setExecutionMode.
You can for example for all shuffles to use blocking exchanges.

I'm not proposing an API that would allow this to be set per edge.

On 25/05/2022 15:23, Xintong Song wrote:
In general, I agree with you about aiming jobs with no/few blocking 
exchanges for fine-grained recovery. The only problem is, correct me 
if I'm wrong, users currently cannot control the data exchanging mode 
of a specific edge. I'm not aware of such APIs.


As a first step, I'd prefer excluding this from the scope of this FLIP.

Best,

Xintong



On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler  
wrote:


Yes; but that's also a limitation of the current fine-grained
recovery.

My suggestion was primarily aimed at jobs that have no/few blocking
exchanges, where users would currently have to explicitly configure
additional blocking exchanges to really get something out of
fine-grained recovery (at the expense of e2e job duration).

On 25/05/2022 14:47, Xintong Song wrote:
>> Will this also allow spilling everything to disk while also
forwarding
>> data to the next task?
>>
> Yes, as long as the downstream task is started, this always
forward the
> data, even while spilling everything.
>
> This would allow us to improve fine-grained recovery by no
longer being
>> constrained to pipelined regions.
>
> I think it helps preventing restarts of the upstreams for a
failed task,
> but not the downstreams. Because there's no guarantee a
restarted task will
> prevent exactly same data (in terms of order) as the previous
execution,
> thus downstreams cannot resume consuming the data.
>
>
> Best,
>
> Xintong
>
>
>
> On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler
 wrote:
>
>> Will this also allow spilling everything to disk while also
forwarding
>> data to the next task?
>>
>> This would allow us to improve fine-grained recovery by no
longer being
>> constrained to pipelined regions.
>>
>> On 25/05/2022 05:55, weijie guo wrote:
>>> Hi All,
>>> Thank you for your attention and feedback.
>>> Do you have any other comments? If there are no other
questions, I'll
>> vote
>>> on FLIP-235 tomorrow.
>>>
>>> Best regards,
>>>
>>> Weijie
>>>
>>>
>>> Aitozi  于2022年5月20日周五 13:22写道:
>>>
 Hi Xintong
       Thanks for your detailed explanation, I misunderstand
the spill
 behavior at first glance,
 I get your point now. I think it will be a good addition to
the current
 execution mode.
 Looking forward to it :)

 Best,
 Aitozi

 Xintong Song  于2022年5月20日周五
12:26写道:

> Hi Aitozi,
>
> In which case we can use the hybrid shuffle mode
>
> Just to directly answer this question, in addition to
> Weijie's explanations. For batch workload, if you want the
workload to
 take
> advantage of as many resources as available, which ranges
from a single
> slot to as many slots as the total tasks, you may consider
hybrid
>> shuffle
> mode. Admittedly, this may not always be wanted, e.g., users
may not
>> want
> to execute a job if there's too few resources available, or
may not
>> want
 a
> job taking too many of the cluster resources. That's why we
propose
 hybrid
> shuffle as an additional option for batch users, rather than a
 replacement
> for Pipelined or Blocking mode.
>
> So you mean the hybrid shuffle mode will limit its usage to
the bounded
>> source, Right ?
>>
> Yes.
>
> One more question, with the bounded data and partly of the
stage is
 running
>> in the Pipelined shuffle mode, what will be the behavior of
the task
>> failure, Is the checkpoint enabled for these running stages
or will it
>> re-run after the failure?
>>
> There's no checkpoints. The failover behavior depends on the
spilling
> strategy.
> - In the first version, we only consider a selective
spilling strategy,
> which means spill data as little as possible to the disk,
which means
>> in
> case of failover upstream tasks need to be restarted to
reproduce the
> complete intermediate results.
> - An alternative strategy we may introduce in future if
needed is to
 spill
> the complete intermediate results. That avoids restarting
upstream
>> tasks
 in
> case of failover, because the produced intermediate results
can be
> re-consumed, at the cost of more disk IO load.
> With bo

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-25 Thread Xintong Song
In general, I agree with you about aiming jobs with no/few blocking
exchanges for fine-grained recovery. The only problem is, correct me if I'm
wrong, users currently cannot control the data exchanging mode of a
specific edge. I'm not aware of such APIs.

As a first step, I'd prefer excluding this from the scope of this FLIP.

Best,

Xintong



On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler  wrote:

> Yes; but that's also a limitation of the current fine-grained recovery.
>
> My suggestion was primarily aimed at jobs that have no/few blocking
> exchanges, where users would currently have to explicitly configure
> additional blocking exchanges to really get something out of
> fine-grained recovery (at the expense of e2e job duration).
>
> On 25/05/2022 14:47, Xintong Song wrote:
> >> Will this also allow spilling everything to disk while also forwarding
> >> data to the next task?
> >>
> > Yes, as long as the downstream task is started, this always forward the
> > data, even while spilling everything.
> >
> > This would allow us to improve fine-grained recovery by no longer being
> >> constrained to pipelined regions.
> >
> > I think it helps preventing restarts of the upstreams for a failed task,
> > but not the downstreams. Because there's no guarantee a restarted task
> will
> > prevent exactly same data (in terms of order) as the previous execution,
> > thus downstreams cannot resume consuming the data.
> >
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler 
> wrote:
> >
> >> Will this also allow spilling everything to disk while also forwarding
> >> data to the next task?
> >>
> >> This would allow us to improve fine-grained recovery by no longer being
> >> constrained to pipelined regions.
> >>
> >> On 25/05/2022 05:55, weijie guo wrote:
> >>> Hi All,
> >>> Thank you for your attention and feedback.
> >>> Do you have any other comments? If there are no other questions, I'll
> >> vote
> >>> on FLIP-235 tomorrow.
> >>>
> >>> Best regards,
> >>>
> >>> Weijie
> >>>
> >>>
> >>> Aitozi  于2022年5月20日周五 13:22写道:
> >>>
>  Hi Xintong
>    Thanks for your detailed explanation, I misunderstand the spill
>  behavior at first glance,
>  I get your point now. I think it will be a good addition to the
> current
>  execution mode.
>  Looking forward to it :)
> 
>  Best,
>  Aitozi
> 
>  Xintong Song  于2022年5月20日周五 12:26写道:
> 
> > Hi Aitozi,
> >
> > In which case we can use the hybrid shuffle mode
> >
> > Just to directly answer this question, in addition to
> > Weijie's explanations. For batch workload, if you want the workload
> to
>  take
> > advantage of as many resources as available, which ranges from a
> single
> > slot to as many slots as the total tasks, you may consider hybrid
> >> shuffle
> > mode. Admittedly, this may not always be wanted, e.g., users may not
> >> want
> > to execute a job if there's too few resources available, or may not
> >> want
>  a
> > job taking too many of the cluster resources. That's why we propose
>  hybrid
> > shuffle as an additional option for batch users, rather than a
>  replacement
> > for Pipelined or Blocking mode.
> >
> > So you mean the hybrid shuffle mode will limit its usage to the
> bounded
> >> source, Right ?
> >>
> > Yes.
> >
> > One more question, with the bounded data and partly of the stage is
>  running
> >> in the Pipelined shuffle mode, what will be the behavior of the task
> >> failure, Is the checkpoint enabled for these running stages or will
> it
> >> re-run after the failure?
> >>
> > There's no checkpoints. The failover behavior depends on the spilling
> > strategy.
> > - In the first version, we only consider a selective spilling
> strategy,
> > which means spill data as little as possible to the disk, which means
> >> in
> > case of failover upstream tasks need to be restarted to reproduce the
> > complete intermediate results.
> > - An alternative strategy we may introduce in future if needed is to
>  spill
> > the complete intermediate results. That avoids restarting upstream
> >> tasks
>  in
> > case of failover, because the produced intermediate results can be
> > re-consumed, at the cost of more disk IO load.
> > With both strategies, the trade-off between failover cost and IO load
> >> is
> > for the user to decide. This is also discussed in the
> MemoryDataManager
> > section of the FLIP.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Fri, May 20, 2022 at 12:10 PM Aitozi 
> wrote:
> >
> >> Thanks Weijie for your answer. So you mean the hybrid shuffle mode
> >> will
> >> limit
> >> its usage to the bounded source, Right ?
> >> One more question, with the bounded data and partly of the stage is
> > running
> >> in the

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-25 Thread Chesnay Schepler

Yes; but that's also a limitation of the current fine-grained recovery.

My suggestion was primarily aimed at jobs that have no/few blocking 
exchanges, where users would currently have to explicitly configure 
additional blocking exchanges to really get something out of 
fine-grained recovery (at the expense of e2e job duration).


On 25/05/2022 14:47, Xintong Song wrote:

Will this also allow spilling everything to disk while also forwarding
data to the next task?


Yes, as long as the downstream task is started, this always forward the
data, even while spilling everything.

This would allow us to improve fine-grained recovery by no longer being

constrained to pipelined regions.


I think it helps preventing restarts of the upstreams for a failed task,
but not the downstreams. Because there's no guarantee a restarted task will
prevent exactly same data (in terms of order) as the previous execution,
thus downstreams cannot resume consuming the data.


Best,

Xintong



On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler  wrote:


Will this also allow spilling everything to disk while also forwarding
data to the next task?

This would allow us to improve fine-grained recovery by no longer being
constrained to pipelined regions.

On 25/05/2022 05:55, weijie guo wrote:

Hi All,
Thank you for your attention and feedback.
Do you have any other comments? If there are no other questions, I'll

vote

on FLIP-235 tomorrow.

Best regards,

Weijie


Aitozi  于2022年5月20日周五 13:22写道:


Hi Xintong
  Thanks for your detailed explanation, I misunderstand the spill
behavior at first glance,
I get your point now. I think it will be a good addition to the current
execution mode.
Looking forward to it :)

Best,
Aitozi

Xintong Song  于2022年5月20日周五 12:26写道:


Hi Aitozi,

In which case we can use the hybrid shuffle mode

Just to directly answer this question, in addition to
Weijie's explanations. For batch workload, if you want the workload to

take

advantage of as many resources as available, which ranges from a single
slot to as many slots as the total tasks, you may consider hybrid

shuffle

mode. Admittedly, this may not always be wanted, e.g., users may not

want

to execute a job if there's too few resources available, or may not

want

a

job taking too many of the cluster resources. That's why we propose

hybrid

shuffle as an additional option for batch users, rather than a

replacement

for Pipelined or Blocking mode.

So you mean the hybrid shuffle mode will limit its usage to the bounded

source, Right ?


Yes.

One more question, with the bounded data and partly of the stage is

running

in the Pipelined shuffle mode, what will be the behavior of the task
failure, Is the checkpoint enabled for these running stages or will it
re-run after the failure?


There's no checkpoints. The failover behavior depends on the spilling
strategy.
- In the first version, we only consider a selective spilling strategy,
which means spill data as little as possible to the disk, which means

in

case of failover upstream tasks need to be restarted to reproduce the
complete intermediate results.
- An alternative strategy we may introduce in future if needed is to

spill

the complete intermediate results. That avoids restarting upstream

tasks

in

case of failover, because the produced intermediate results can be
re-consumed, at the cost of more disk IO load.
With both strategies, the trade-off between failover cost and IO load

is

for the user to decide. This is also discussed in the MemoryDataManager
section of the FLIP.

Best,

Xintong



On Fri, May 20, 2022 at 12:10 PM Aitozi  wrote:


Thanks Weijie for your answer. So you mean the hybrid shuffle mode

will

limit
its usage to the bounded source, Right ?
One more question, with the bounded data and partly of the stage is

running

in the Pipelined shuffle mode, what will be the behavior of the task
failure, Is the
checkpoint enabled for these running stages or will it re-run after

the

failure?

Best,
Aitozi

weijie guo  于2022年5月20日周五 10:45写道:


Hi, Aitozi:

Thank you for the feedback!
Here are some of my thoughts on your question


1.If there is an unbounded data source, but only have resource to

schedule the first stage, will it bring the big burden to the

disk/shuffle

service which will occupy all the resource I think.
First of all, Hybrid Shuffle Mode is oriented to the batch job

scenario,

so

there is no problem of unbounded data sources. Secondly, if you

consider

the stream scenario, I think Pipelined Shuffle should still be the

best

choice at present. For an unbounded data stream, it is not meaningful

to

only run some stages.


2. Which kind of job will benefit from the hybrid shuffle mode.

In

other words, In which case we can use the hybrid shuffle mode:
Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
shuffle mode can effectively utilize cluster resources and avoid some
unnecessary disk IO overhead. For OLAP scenarios, which are

char

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-25 Thread Xintong Song
>
> Will this also allow spilling everything to disk while also forwarding
> data to the next task?
>

Yes, as long as the downstream task is started, this always forward the
data, even while spilling everything.

This would allow us to improve fine-grained recovery by no longer being
> constrained to pipelined regions.


I think it helps preventing restarts of the upstreams for a failed task,
but not the downstreams. Because there's no guarantee a restarted task will
prevent exactly same data (in terms of order) as the previous execution,
thus downstreams cannot resume consuming the data.


Best,

Xintong



On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler  wrote:

> Will this also allow spilling everything to disk while also forwarding
> data to the next task?
>
> This would allow us to improve fine-grained recovery by no longer being
> constrained to pipelined regions.
>
> On 25/05/2022 05:55, weijie guo wrote:
> > Hi All,
> > Thank you for your attention and feedback.
> > Do you have any other comments? If there are no other questions, I'll
> vote
> > on FLIP-235 tomorrow.
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Aitozi  于2022年5月20日周五 13:22写道:
> >
> >> Hi Xintong
> >>  Thanks for your detailed explanation, I misunderstand the spill
> >> behavior at first glance,
> >> I get your point now. I think it will be a good addition to the current
> >> execution mode.
> >> Looking forward to it :)
> >>
> >> Best,
> >> Aitozi
> >>
> >> Xintong Song  于2022年5月20日周五 12:26写道:
> >>
> >>> Hi Aitozi,
> >>>
> >>> In which case we can use the hybrid shuffle mode
> >>>
> >>> Just to directly answer this question, in addition to
> >>> Weijie's explanations. For batch workload, if you want the workload to
> >> take
> >>> advantage of as many resources as available, which ranges from a single
> >>> slot to as many slots as the total tasks, you may consider hybrid
> shuffle
> >>> mode. Admittedly, this may not always be wanted, e.g., users may not
> want
> >>> to execute a job if there's too few resources available, or may not
> want
> >> a
> >>> job taking too many of the cluster resources. That's why we propose
> >> hybrid
> >>> shuffle as an additional option for batch users, rather than a
> >> replacement
> >>> for Pipelined or Blocking mode.
> >>>
> >>> So you mean the hybrid shuffle mode will limit its usage to the bounded
>  source, Right ?
> 
> >>> Yes.
> >>>
> >>> One more question, with the bounded data and partly of the stage is
> >> running
>  in the Pipelined shuffle mode, what will be the behavior of the task
>  failure, Is the checkpoint enabled for these running stages or will it
>  re-run after the failure?
> 
> >>> There's no checkpoints. The failover behavior depends on the spilling
> >>> strategy.
> >>> - In the first version, we only consider a selective spilling strategy,
> >>> which means spill data as little as possible to the disk, which means
> in
> >>> case of failover upstream tasks need to be restarted to reproduce the
> >>> complete intermediate results.
> >>> - An alternative strategy we may introduce in future if needed is to
> >> spill
> >>> the complete intermediate results. That avoids restarting upstream
> tasks
> >> in
> >>> case of failover, because the produced intermediate results can be
> >>> re-consumed, at the cost of more disk IO load.
> >>> With both strategies, the trade-off between failover cost and IO load
> is
> >>> for the user to decide. This is also discussed in the MemoryDataManager
> >>> section of the FLIP.
> >>>
> >>> Best,
> >>>
> >>> Xintong
> >>>
> >>>
> >>>
> >>> On Fri, May 20, 2022 at 12:10 PM Aitozi  wrote:
> >>>
>  Thanks Weijie for your answer. So you mean the hybrid shuffle mode
> will
>  limit
>  its usage to the bounded source, Right ?
>  One more question, with the bounded data and partly of the stage is
> >>> running
>  in the Pipelined shuffle mode, what will be the behavior of the task
>  failure, Is the
>  checkpoint enabled for these running stages or will it re-run after
> the
>  failure?
> 
>  Best,
>  Aitozi
> 
>  weijie guo  于2022年5月20日周五 10:45写道:
> 
> > Hi, Aitozi:
> >
> > Thank you for the feedback!
> > Here are some of my thoughts on your question
> >
>  1.If there is an unbounded data source, but only have resource to
> > schedule the first stage, will it bring the big burden to the
>  disk/shuffle
> > service which will occupy all the resource I think.
> > First of all, Hybrid Shuffle Mode is oriented to the batch job
> >>> scenario,
>  so
> > there is no problem of unbounded data sources. Secondly, if you
> >>> consider
> > the stream scenario, I think Pipelined Shuffle should still be the
> >> best
> > choice at present. For an unbounded data stream, it is not meaningful
> >>> to
> > only run some stages.
> >
>  2. Which kind of job will benefit from the hybrid shuffle mode.
> >

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-25 Thread Chesnay Schepler
Will this also allow spilling everything to disk while also forwarding 
data to the next task?


This would allow us to improve fine-grained recovery by no longer being 
constrained to pipelined regions.


On 25/05/2022 05:55, weijie guo wrote:

Hi All,
Thank you for your attention and feedback.
Do you have any other comments? If there are no other questions, I'll vote
on FLIP-235 tomorrow.

Best regards,

Weijie


Aitozi  于2022年5月20日周五 13:22写道:


Hi Xintong
 Thanks for your detailed explanation, I misunderstand the spill
behavior at first glance,
I get your point now. I think it will be a good addition to the current
execution mode.
Looking forward to it :)

Best,
Aitozi

Xintong Song  于2022年5月20日周五 12:26写道:


Hi Aitozi,

In which case we can use the hybrid shuffle mode

Just to directly answer this question, in addition to
Weijie's explanations. For batch workload, if you want the workload to

take

advantage of as many resources as available, which ranges from a single
slot to as many slots as the total tasks, you may consider hybrid shuffle
mode. Admittedly, this may not always be wanted, e.g., users may not want
to execute a job if there's too few resources available, or may not want

a

job taking too many of the cluster resources. That's why we propose

hybrid

shuffle as an additional option for batch users, rather than a

replacement

for Pipelined or Blocking mode.

So you mean the hybrid shuffle mode will limit its usage to the bounded

source, Right ?


Yes.

One more question, with the bounded data and partly of the stage is

running

in the Pipelined shuffle mode, what will be the behavior of the task
failure, Is the checkpoint enabled for these running stages or will it
re-run after the failure?


There's no checkpoints. The failover behavior depends on the spilling
strategy.
- In the first version, we only consider a selective spilling strategy,
which means spill data as little as possible to the disk, which means in
case of failover upstream tasks need to be restarted to reproduce the
complete intermediate results.
- An alternative strategy we may introduce in future if needed is to

spill

the complete intermediate results. That avoids restarting upstream tasks

in

case of failover, because the produced intermediate results can be
re-consumed, at the cost of more disk IO load.
With both strategies, the trade-off between failover cost and IO load is
for the user to decide. This is also discussed in the MemoryDataManager
section of the FLIP.

Best,

Xintong



On Fri, May 20, 2022 at 12:10 PM Aitozi  wrote:


Thanks Weijie for your answer. So you mean the hybrid shuffle mode will
limit
its usage to the bounded source, Right ?
One more question, with the bounded data and partly of the stage is

running

in the Pipelined shuffle mode, what will be the behavior of the task
failure, Is the
checkpoint enabled for these running stages or will it re-run after the
failure?

Best,
Aitozi

weijie guo  于2022年5月20日周五 10:45写道:


Hi, Aitozi:

Thank you for the feedback!
Here are some of my thoughts on your question


1.If there is an unbounded data source, but only have resource to

schedule the first stage, will it bring the big burden to the

disk/shuffle

service which will occupy all the resource I think.
First of all, Hybrid Shuffle Mode is oriented to the batch job

scenario,

so

there is no problem of unbounded data sources. Secondly, if you

consider

the stream scenario, I think Pipelined Shuffle should still be the

best

choice at present. For an unbounded data stream, it is not meaningful

to

only run some stages.


2. Which kind of job will benefit from the hybrid shuffle mode.

In

other words, In which case we can use the hybrid shuffle mode:
Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
shuffle mode can effectively utilize cluster resources and avoid some
unnecessary disk IO overhead. For OLAP scenarios, which are

characterized

by a large number of concurrently submitted short batch jobs, hybrid
shuffle can solve the scheduling deadlock problem of pipelined

shuffle

and

achieve similar performance.

Best regards,

Weijie


Aitozi  于2022年5月20日周五 08:05写道:


Hi Weijie:

  Thanks for the nice FLIP, I have couple questions about this:

1) In the hybrid shuffle mode, the shuffle mode is decided by the

resource.

If there
is an unbounded data source, but only have resource to schedule the

first

stage, will it
bring the big burden to the disk/shuffle service which will occupy

all

the

resource I think.

2) Which kind of job will benefit from the hybrid shuffle mode. In

other

words, In which
case we can use the hybrid shuffle mode:
- For batch job want to use more resource to reduce the e2e time ?
- Or for streaming job which may lack of resource temporarily ?
- Or for OLAP job which will try to make best use of available

resources

as

you mentioned to finish the query?
Just want to know the typical use case for the Hybrid shuffle mode

:)


Best,
Ai

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-24 Thread weijie guo
Hi All,
Thank you for your attention and feedback.
Do you have any other comments? If there are no other questions, I'll vote
on FLIP-235 tomorrow.

Best regards,

Weijie


Aitozi  于2022年5月20日周五 13:22写道:

> Hi Xintong
> Thanks for your detailed explanation, I misunderstand the spill
> behavior at first glance,
> I get your point now. I think it will be a good addition to the current
> execution mode.
> Looking forward to it :)
>
> Best,
> Aitozi
>
> Xintong Song  于2022年5月20日周五 12:26写道:
>
> > Hi Aitozi,
> >
> > In which case we can use the hybrid shuffle mode
> >
> > Just to directly answer this question, in addition to
> > Weijie's explanations. For batch workload, if you want the workload to
> take
> > advantage of as many resources as available, which ranges from a single
> > slot to as many slots as the total tasks, you may consider hybrid shuffle
> > mode. Admittedly, this may not always be wanted, e.g., users may not want
> > to execute a job if there's too few resources available, or may not want
> a
> > job taking too many of the cluster resources. That's why we propose
> hybrid
> > shuffle as an additional option for batch users, rather than a
> replacement
> > for Pipelined or Blocking mode.
> >
> > So you mean the hybrid shuffle mode will limit its usage to the bounded
> > > source, Right ?
> > >
> > Yes.
> >
> > One more question, with the bounded data and partly of the stage is
> running
> > > in the Pipelined shuffle mode, what will be the behavior of the task
> > > failure, Is the checkpoint enabled for these running stages or will it
> > > re-run after the failure?
> > >
> > There's no checkpoints. The failover behavior depends on the spilling
> > strategy.
> > - In the first version, we only consider a selective spilling strategy,
> > which means spill data as little as possible to the disk, which means in
> > case of failover upstream tasks need to be restarted to reproduce the
> > complete intermediate results.
> > - An alternative strategy we may introduce in future if needed is to
> spill
> > the complete intermediate results. That avoids restarting upstream tasks
> in
> > case of failover, because the produced intermediate results can be
> > re-consumed, at the cost of more disk IO load.
> > With both strategies, the trade-off between failover cost and IO load is
> > for the user to decide. This is also discussed in the MemoryDataManager
> > section of the FLIP.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Fri, May 20, 2022 at 12:10 PM Aitozi  wrote:
> >
> > > Thanks Weijie for your answer. So you mean the hybrid shuffle mode will
> > > limit
> > > its usage to the bounded source, Right ?
> > > One more question, with the bounded data and partly of the stage is
> > running
> > > in the Pipelined shuffle mode, what will be the behavior of the task
> > > failure, Is the
> > > checkpoint enabled for these running stages or will it re-run after the
> > > failure?
> > >
> > > Best,
> > > Aitozi
> > >
> > > weijie guo  于2022年5月20日周五 10:45写道:
> > >
> > > > Hi, Aitozi:
> > > >
> > > > Thank you for the feedback!
> > > > Here are some of my thoughts on your question
> > > >
> > > > >>> 1.If there is an unbounded data source, but only have resource to
> > > > schedule the first stage, will it bring the big burden to the
> > > disk/shuffle
> > > > service which will occupy all the resource I think.
> > > > First of all, Hybrid Shuffle Mode is oriented to the batch job
> > scenario,
> > > so
> > > > there is no problem of unbounded data sources. Secondly, if you
> > consider
> > > > the stream scenario, I think Pipelined Shuffle should still be the
> best
> > > > choice at present. For an unbounded data stream, it is not meaningful
> > to
> > > > only run some stages.
> > > >
> > > > >>> 2. Which kind of job will benefit from the hybrid shuffle mode.
> In
> > > > other words, In which case we can use the hybrid shuffle mode:
> > > > Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
> > > > shuffle mode can effectively utilize cluster resources and avoid some
> > > > unnecessary disk IO overhead. For OLAP scenarios, which are
> > characterized
> > > > by a large number of concurrently submitted short batch jobs, hybrid
> > > > shuffle can solve the scheduling deadlock problem of pipelined
> shuffle
> > > and
> > > > achieve similar performance.
> > > >
> > > > Best regards,
> > > >
> > > > Weijie
> > > >
> > > >
> > > > Aitozi  于2022年5月20日周五 08:05写道:
> > > >
> > > > > Hi Weijie:
> > > > >
> > > > >  Thanks for the nice FLIP, I have couple questions about this:
> > > > >
> > > > > 1) In the hybrid shuffle mode, the shuffle mode is decided by the
> > > > resource.
> > > > > If there
> > > > > is an unbounded data source, but only have resource to schedule the
> > > first
> > > > > stage, will it
> > > > > bring the big burden to the disk/shuffle service which will occupy
> > all
> > > > the
> > > > > resource I think.
> > > > >
> > > > > 2) Which kind of job will benef

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-19 Thread Aitozi
Hi Xintong
Thanks for your detailed explanation, I misunderstand the spill
behavior at first glance,
I get your point now. I think it will be a good addition to the current
execution mode.
Looking forward to it :)

Best,
Aitozi

Xintong Song  于2022年5月20日周五 12:26写道:

> Hi Aitozi,
>
> In which case we can use the hybrid shuffle mode
>
> Just to directly answer this question, in addition to
> Weijie's explanations. For batch workload, if you want the workload to take
> advantage of as many resources as available, which ranges from a single
> slot to as many slots as the total tasks, you may consider hybrid shuffle
> mode. Admittedly, this may not always be wanted, e.g., users may not want
> to execute a job if there's too few resources available, or may not want a
> job taking too many of the cluster resources. That's why we propose hybrid
> shuffle as an additional option for batch users, rather than a replacement
> for Pipelined or Blocking mode.
>
> So you mean the hybrid shuffle mode will limit its usage to the bounded
> > source, Right ?
> >
> Yes.
>
> One more question, with the bounded data and partly of the stage is running
> > in the Pipelined shuffle mode, what will be the behavior of the task
> > failure, Is the checkpoint enabled for these running stages or will it
> > re-run after the failure?
> >
> There's no checkpoints. The failover behavior depends on the spilling
> strategy.
> - In the first version, we only consider a selective spilling strategy,
> which means spill data as little as possible to the disk, which means in
> case of failover upstream tasks need to be restarted to reproduce the
> complete intermediate results.
> - An alternative strategy we may introduce in future if needed is to spill
> the complete intermediate results. That avoids restarting upstream tasks in
> case of failover, because the produced intermediate results can be
> re-consumed, at the cost of more disk IO load.
> With both strategies, the trade-off between failover cost and IO load is
> for the user to decide. This is also discussed in the MemoryDataManager
> section of the FLIP.
>
> Best,
>
> Xintong
>
>
>
> On Fri, May 20, 2022 at 12:10 PM Aitozi  wrote:
>
> > Thanks Weijie for your answer. So you mean the hybrid shuffle mode will
> > limit
> > its usage to the bounded source, Right ?
> > One more question, with the bounded data and partly of the stage is
> running
> > in the Pipelined shuffle mode, what will be the behavior of the task
> > failure, Is the
> > checkpoint enabled for these running stages or will it re-run after the
> > failure?
> >
> > Best,
> > Aitozi
> >
> > weijie guo  于2022年5月20日周五 10:45写道:
> >
> > > Hi, Aitozi:
> > >
> > > Thank you for the feedback!
> > > Here are some of my thoughts on your question
> > >
> > > >>> 1.If there is an unbounded data source, but only have resource to
> > > schedule the first stage, will it bring the big burden to the
> > disk/shuffle
> > > service which will occupy all the resource I think.
> > > First of all, Hybrid Shuffle Mode is oriented to the batch job
> scenario,
> > so
> > > there is no problem of unbounded data sources. Secondly, if you
> consider
> > > the stream scenario, I think Pipelined Shuffle should still be the best
> > > choice at present. For an unbounded data stream, it is not meaningful
> to
> > > only run some stages.
> > >
> > > >>> 2. Which kind of job will benefit from the hybrid shuffle mode. In
> > > other words, In which case we can use the hybrid shuffle mode:
> > > Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
> > > shuffle mode can effectively utilize cluster resources and avoid some
> > > unnecessary disk IO overhead. For OLAP scenarios, which are
> characterized
> > > by a large number of concurrently submitted short batch jobs, hybrid
> > > shuffle can solve the scheduling deadlock problem of pipelined shuffle
> > and
> > > achieve similar performance.
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Aitozi  于2022年5月20日周五 08:05写道:
> > >
> > > > Hi Weijie:
> > > >
> > > >  Thanks for the nice FLIP, I have couple questions about this:
> > > >
> > > > 1) In the hybrid shuffle mode, the shuffle mode is decided by the
> > > resource.
> > > > If there
> > > > is an unbounded data source, but only have resource to schedule the
> > first
> > > > stage, will it
> > > > bring the big burden to the disk/shuffle service which will occupy
> all
> > > the
> > > > resource I think.
> > > >
> > > > 2) Which kind of job will benefit from the hybrid shuffle mode. In
> > other
> > > > words, In which
> > > > case we can use the hybrid shuffle mode:
> > > > - For batch job want to use more resource to reduce the e2e time ?
> > > > - Or for streaming job which may lack of resource temporarily ?
> > > > - Or for OLAP job which will try to make best use of available
> > resources
> > > as
> > > > you mentioned to finish the query?
> > > > Just want to know the typical use case for the Hybrid shuffle m

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-19 Thread Xintong Song
Hi Aitozi,

In which case we can use the hybrid shuffle mode

Just to directly answer this question, in addition to
Weijie's explanations. For batch workload, if you want the workload to take
advantage of as many resources as available, which ranges from a single
slot to as many slots as the total tasks, you may consider hybrid shuffle
mode. Admittedly, this may not always be wanted, e.g., users may not want
to execute a job if there's too few resources available, or may not want a
job taking too many of the cluster resources. That's why we propose hybrid
shuffle as an additional option for batch users, rather than a replacement
for Pipelined or Blocking mode.

So you mean the hybrid shuffle mode will limit its usage to the bounded
> source, Right ?
>
Yes.

One more question, with the bounded data and partly of the stage is running
> in the Pipelined shuffle mode, what will be the behavior of the task
> failure, Is the checkpoint enabled for these running stages or will it
> re-run after the failure?
>
There's no checkpoints. The failover behavior depends on the spilling
strategy.
- In the first version, we only consider a selective spilling strategy,
which means spill data as little as possible to the disk, which means in
case of failover upstream tasks need to be restarted to reproduce the
complete intermediate results.
- An alternative strategy we may introduce in future if needed is to spill
the complete intermediate results. That avoids restarting upstream tasks in
case of failover, because the produced intermediate results can be
re-consumed, at the cost of more disk IO load.
With both strategies, the trade-off between failover cost and IO load is
for the user to decide. This is also discussed in the MemoryDataManager
section of the FLIP.

Best,

Xintong



On Fri, May 20, 2022 at 12:10 PM Aitozi  wrote:

> Thanks Weijie for your answer. So you mean the hybrid shuffle mode will
> limit
> its usage to the bounded source, Right ?
> One more question, with the bounded data and partly of the stage is running
> in the Pipelined shuffle mode, what will be the behavior of the task
> failure, Is the
> checkpoint enabled for these running stages or will it re-run after the
> failure?
>
> Best,
> Aitozi
>
> weijie guo  于2022年5月20日周五 10:45写道:
>
> > Hi, Aitozi:
> >
> > Thank you for the feedback!
> > Here are some of my thoughts on your question
> >
> > >>> 1.If there is an unbounded data source, but only have resource to
> > schedule the first stage, will it bring the big burden to the
> disk/shuffle
> > service which will occupy all the resource I think.
> > First of all, Hybrid Shuffle Mode is oriented to the batch job scenario,
> so
> > there is no problem of unbounded data sources. Secondly, if you consider
> > the stream scenario, I think Pipelined Shuffle should still be the best
> > choice at present. For an unbounded data stream, it is not meaningful to
> > only run some stages.
> >
> > >>> 2. Which kind of job will benefit from the hybrid shuffle mode. In
> > other words, In which case we can use the hybrid shuffle mode:
> > Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
> > shuffle mode can effectively utilize cluster resources and avoid some
> > unnecessary disk IO overhead. For OLAP scenarios, which are characterized
> > by a large number of concurrently submitted short batch jobs, hybrid
> > shuffle can solve the scheduling deadlock problem of pipelined shuffle
> and
> > achieve similar performance.
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Aitozi  于2022年5月20日周五 08:05写道:
> >
> > > Hi Weijie:
> > >
> > >  Thanks for the nice FLIP, I have couple questions about this:
> > >
> > > 1) In the hybrid shuffle mode, the shuffle mode is decided by the
> > resource.
> > > If there
> > > is an unbounded data source, but only have resource to schedule the
> first
> > > stage, will it
> > > bring the big burden to the disk/shuffle service which will occupy all
> > the
> > > resource I think.
> > >
> > > 2) Which kind of job will benefit from the hybrid shuffle mode. In
> other
> > > words, In which
> > > case we can use the hybrid shuffle mode:
> > > - For batch job want to use more resource to reduce the e2e time ?
> > > - Or for streaming job which may lack of resource temporarily ?
> > > - Or for OLAP job which will try to make best use of available
> resources
> > as
> > > you mentioned to finish the query?
> > > Just want to know the typical use case for the Hybrid shuffle mode :)
> > >
> > >
> > > Best,
> > > Aitozi.
> > >
> > > weijie guo  于2022年5月19日周四 18:33写道:
> > >
> > > > Yangze, Thank you for the feedback!
> > > > Here's my thoughts for your questions:
> > > >
> > > > >>> How do we decide the size of the buffer pool in MemoryDataManager
> > and
> > > > the read buffers in FileDataManager?
> > > > The BufferPool in MemoryDataManager is the LocalBufferPool used by
> > > > ResultPartition, and the size is the same as the current
> implementation
> > > of
> > > > sort-me

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-19 Thread Aitozi
Thanks Weijie for your answer. So you mean the hybrid shuffle mode will
limit
its usage to the bounded source, Right ?
One more question, with the bounded data and partly of the stage is running
in the Pipelined shuffle mode, what will be the behavior of the task
failure, Is the
checkpoint enabled for these running stages or will it re-run after the
failure?

Best,
Aitozi

weijie guo  于2022年5月20日周五 10:45写道:

> Hi, Aitozi:
>
> Thank you for the feedback!
> Here are some of my thoughts on your question
>
> >>> 1.If there is an unbounded data source, but only have resource to
> schedule the first stage, will it bring the big burden to the disk/shuffle
> service which will occupy all the resource I think.
> First of all, Hybrid Shuffle Mode is oriented to the batch job scenario, so
> there is no problem of unbounded data sources. Secondly, if you consider
> the stream scenario, I think Pipelined Shuffle should still be the best
> choice at present. For an unbounded data stream, it is not meaningful to
> only run some stages.
>
> >>> 2. Which kind of job will benefit from the hybrid shuffle mode. In
> other words, In which case we can use the hybrid shuffle mode:
> Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
> shuffle mode can effectively utilize cluster resources and avoid some
> unnecessary disk IO overhead. For OLAP scenarios, which are characterized
> by a large number of concurrently submitted short batch jobs, hybrid
> shuffle can solve the scheduling deadlock problem of pipelined shuffle and
> achieve similar performance.
>
> Best regards,
>
> Weijie
>
>
> Aitozi  于2022年5月20日周五 08:05写道:
>
> > Hi Weijie:
> >
> >  Thanks for the nice FLIP, I have couple questions about this:
> >
> > 1) In the hybrid shuffle mode, the shuffle mode is decided by the
> resource.
> > If there
> > is an unbounded data source, but only have resource to schedule the first
> > stage, will it
> > bring the big burden to the disk/shuffle service which will occupy all
> the
> > resource I think.
> >
> > 2) Which kind of job will benefit from the hybrid shuffle mode. In other
> > words, In which
> > case we can use the hybrid shuffle mode:
> > - For batch job want to use more resource to reduce the e2e time ?
> > - Or for streaming job which may lack of resource temporarily ?
> > - Or for OLAP job which will try to make best use of available resources
> as
> > you mentioned to finish the query?
> > Just want to know the typical use case for the Hybrid shuffle mode :)
> >
> >
> > Best,
> > Aitozi.
> >
> > weijie guo  于2022年5月19日周四 18:33写道:
> >
> > > Yangze, Thank you for the feedback!
> > > Here's my thoughts for your questions:
> > >
> > > >>> How do we decide the size of the buffer pool in MemoryDataManager
> and
> > > the read buffers in FileDataManager?
> > > The BufferPool in MemoryDataManager is the LocalBufferPool used by
> > > ResultPartition, and the size is the same as the current implementation
> > of
> > > sort-merge shuffle. In other words, the minimum value of BufferPool is
> a
> > > configurable fixed value, and the maximum value is Math.max(min, 4 *
> > > numSubpartitions). The default value can be determined by running the
> > > TPC-DS tests.
> > > Read buffers in FileDataManager are requested from the
> > > BatchShuffleReadBufferPool shared by TaskManager, it's size controlled
> by
> > > *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the default
> > > value is 32M, which is consistent with the current sort-merge shuffle
> > > logic.
> > >
> > > >>> Is there an upper limit for the sum of them? If there is, how does
> > > MemoryDataManager and FileDataManager sync the memory usage?
> > > The buffers of the MemoryDataManager are limited by the size of the
> > > LocalBufferPool, and the upper limit is the size of the Network Memory.
> > The
> > > buffers of the FileDataManager are directly requested from
> > > UnpooledOffHeapMemory, and are also limited by the size of the
> framework
> > > off-heap memory. I think there should be no need for additional
> > > synchronization mechanisms.
> > >
> > > >>> How do you disable the slot sharing? If user configures both the
> slot
> > > sharing group and hybrid shuffle, what will happen to that job?
> > > I think we can print a warning log when Hybrid Shuffle is enabled and
> SSG
> > > is configured during the JobGraph compilation stage, and fallback to
> the
> > > region slot sharing group by default. Of course, it will be emphasized
> in
> > > the document that we do not currently support SSG, If configured, it
> will
> > > fall back to the default.
> > >
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Yangze Guo  于2022年5月19日周四 16:25写道:
> > >
> > > > Thanks for driving this. Xintong and Weijie.
> > > >
> > > > I believe this feature will make Flink a better batch/OLAP engine. +1
> > > > for the overall design.
> > > >
> > > > Some questions:
> > > > 1. How do we decide the size of the buffer pool in MemoryDataManager
> > 

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-19 Thread weijie guo
Hi, Aitozi:

Thank you for the feedback!
Here are some of my thoughts on your question

>>> 1.If there is an unbounded data source, but only have resource to
schedule the first stage, will it bring the big burden to the disk/shuffle
service which will occupy all the resource I think.
First of all, Hybrid Shuffle Mode is oriented to the batch job scenario, so
there is no problem of unbounded data sources. Secondly, if you consider
the stream scenario, I think Pipelined Shuffle should still be the best
choice at present. For an unbounded data stream, it is not meaningful to
only run some stages.

>>> 2. Which kind of job will benefit from the hybrid shuffle mode. In
other words, In which case we can use the hybrid shuffle mode:
Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
shuffle mode can effectively utilize cluster resources and avoid some
unnecessary disk IO overhead. For OLAP scenarios, which are characterized
by a large number of concurrently submitted short batch jobs, hybrid
shuffle can solve the scheduling deadlock problem of pipelined shuffle and
achieve similar performance.

Best regards,

Weijie


Aitozi  于2022年5月20日周五 08:05写道:

> Hi Weijie:
>
>  Thanks for the nice FLIP, I have couple questions about this:
>
> 1) In the hybrid shuffle mode, the shuffle mode is decided by the resource.
> If there
> is an unbounded data source, but only have resource to schedule the first
> stage, will it
> bring the big burden to the disk/shuffle service which will occupy all the
> resource I think.
>
> 2) Which kind of job will benefit from the hybrid shuffle mode. In other
> words, In which
> case we can use the hybrid shuffle mode:
> - For batch job want to use more resource to reduce the e2e time ?
> - Or for streaming job which may lack of resource temporarily ?
> - Or for OLAP job which will try to make best use of available resources as
> you mentioned to finish the query?
> Just want to know the typical use case for the Hybrid shuffle mode :)
>
>
> Best,
> Aitozi.
>
> weijie guo  于2022年5月19日周四 18:33写道:
>
> > Yangze, Thank you for the feedback!
> > Here's my thoughts for your questions:
> >
> > >>> How do we decide the size of the buffer pool in MemoryDataManager and
> > the read buffers in FileDataManager?
> > The BufferPool in MemoryDataManager is the LocalBufferPool used by
> > ResultPartition, and the size is the same as the current implementation
> of
> > sort-merge shuffle. In other words, the minimum value of BufferPool is a
> > configurable fixed value, and the maximum value is Math.max(min, 4 *
> > numSubpartitions). The default value can be determined by running the
> > TPC-DS tests.
> > Read buffers in FileDataManager are requested from the
> > BatchShuffleReadBufferPool shared by TaskManager, it's size controlled by
> > *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the default
> > value is 32M, which is consistent with the current sort-merge shuffle
> > logic.
> >
> > >>> Is there an upper limit for the sum of them? If there is, how does
> > MemoryDataManager and FileDataManager sync the memory usage?
> > The buffers of the MemoryDataManager are limited by the size of the
> > LocalBufferPool, and the upper limit is the size of the Network Memory.
> The
> > buffers of the FileDataManager are directly requested from
> > UnpooledOffHeapMemory, and are also limited by the size of the framework
> > off-heap memory. I think there should be no need for additional
> > synchronization mechanisms.
> >
> > >>> How do you disable the slot sharing? If user configures both the slot
> > sharing group and hybrid shuffle, what will happen to that job?
> > I think we can print a warning log when Hybrid Shuffle is enabled and SSG
> > is configured during the JobGraph compilation stage, and fallback to the
> > region slot sharing group by default. Of course, it will be emphasized in
> > the document that we do not currently support SSG, If configured, it will
> > fall back to the default.
> >
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Yangze Guo  于2022年5月19日周四 16:25写道:
> >
> > > Thanks for driving this. Xintong and Weijie.
> > >
> > > I believe this feature will make Flink a better batch/OLAP engine. +1
> > > for the overall design.
> > >
> > > Some questions:
> > > 1. How do we decide the size of the buffer pool in MemoryDataManager
> > > and the read buffers in FileDataManager?
> > > 2. Is there an upper limit for the sum of them? If there is, how does
> > > MemoryDataManager and FileDataManager sync the memory usage?
> > > 3. How do you disable the slot sharing? If user configures both the
> > > slot sharing group and hybrid shuffle, what will happen to that job?
> > >
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Thu, May 19, 2022 at 2:41 PM Xintong Song 
> > > wrote:
> > > >
> > > > Thanks for preparing this FLIP, Weijie.
> > > >
> > > > I think this is a good improvement on batch resource elasticity.
> > Looking
> > > > forward to the community feedback.
> >

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-19 Thread Aitozi
Hi Weijie:

 Thanks for the nice FLIP, I have couple questions about this:

1) In the hybrid shuffle mode, the shuffle mode is decided by the resource.
If there
is an unbounded data source, but only have resource to schedule the first
stage, will it
bring the big burden to the disk/shuffle service which will occupy all the
resource I think.

2) Which kind of job will benefit from the hybrid shuffle mode. In other
words, In which
case we can use the hybrid shuffle mode:
- For batch job want to use more resource to reduce the e2e time ?
- Or for streaming job which may lack of resource temporarily ?
- Or for OLAP job which will try to make best use of available resources as
you mentioned to finish the query?
Just want to know the typical use case for the Hybrid shuffle mode :)


Best,
Aitozi.

weijie guo  于2022年5月19日周四 18:33写道:

> Yangze, Thank you for the feedback!
> Here's my thoughts for your questions:
>
> >>> How do we decide the size of the buffer pool in MemoryDataManager and
> the read buffers in FileDataManager?
> The BufferPool in MemoryDataManager is the LocalBufferPool used by
> ResultPartition, and the size is the same as the current implementation of
> sort-merge shuffle. In other words, the minimum value of BufferPool is a
> configurable fixed value, and the maximum value is Math.max(min, 4 *
> numSubpartitions). The default value can be determined by running the
> TPC-DS tests.
> Read buffers in FileDataManager are requested from the
> BatchShuffleReadBufferPool shared by TaskManager, it's size controlled by
> *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the default
> value is 32M, which is consistent with the current sort-merge shuffle
> logic.
>
> >>> Is there an upper limit for the sum of them? If there is, how does
> MemoryDataManager and FileDataManager sync the memory usage?
> The buffers of the MemoryDataManager are limited by the size of the
> LocalBufferPool, and the upper limit is the size of the Network Memory. The
> buffers of the FileDataManager are directly requested from
> UnpooledOffHeapMemory, and are also limited by the size of the framework
> off-heap memory. I think there should be no need for additional
> synchronization mechanisms.
>
> >>> How do you disable the slot sharing? If user configures both the slot
> sharing group and hybrid shuffle, what will happen to that job?
> I think we can print a warning log when Hybrid Shuffle is enabled and SSG
> is configured during the JobGraph compilation stage, and fallback to the
> region slot sharing group by default. Of course, it will be emphasized in
> the document that we do not currently support SSG, If configured, it will
> fall back to the default.
>
>
> Best regards,
>
> Weijie
>
>
> Yangze Guo  于2022年5月19日周四 16:25写道:
>
> > Thanks for driving this. Xintong and Weijie.
> >
> > I believe this feature will make Flink a better batch/OLAP engine. +1
> > for the overall design.
> >
> > Some questions:
> > 1. How do we decide the size of the buffer pool in MemoryDataManager
> > and the read buffers in FileDataManager?
> > 2. Is there an upper limit for the sum of them? If there is, how does
> > MemoryDataManager and FileDataManager sync the memory usage?
> > 3. How do you disable the slot sharing? If user configures both the
> > slot sharing group and hybrid shuffle, what will happen to that job?
> >
> >
> > Best,
> > Yangze Guo
> >
> > On Thu, May 19, 2022 at 2:41 PM Xintong Song 
> > wrote:
> > >
> > > Thanks for preparing this FLIP, Weijie.
> > >
> > > I think this is a good improvement on batch resource elasticity.
> Looking
> > > forward to the community feedback.
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Thu, May 19, 2022 at 2:31 PM weijie guo 
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > >
> > > > I’d like to start a discussion about FLIP-235[1], which introduce a
> > new shuffle mode
> > > >  can overcome some of the problems of Pipelined Shuffle and Blocking
> > Shuffle in batch scenarios.
> > > >
> > > >
> > > > Currently in Flink, task scheduling is more or less constrained by
> the
> > shuffle implementations.
> > > > This will bring the following disadvantages:
> > > >
> > > >1. Pipelined Shuffle:
> > > > For pipelined shuffle, the upstream and downstream tasks are
> > required to be deployed at the same time, to avoid upstream tasks being
> > blocked forever. This is fine when there are enough resources for both
> > upstream and downstream tasks to run simultaneously, but will cause the
> > following problems otherwise:
> > > >1.
> > > >   Pipelined shuffle connected tasks (i.e., a pipelined region)
> > cannot be executed until obtaining resources for all of them, resulting
> in
> > longer job finishing time and poorer resource efficiency due to holding
> > part of the resources idle while waiting for the rest.
> > > >   2.
> > > >   More severely, if multiple jobs each hold part of the cluster
> > resources and are waiting for more, a deadlock 

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-19 Thread weijie guo
Yangze, Thank you for the feedback!
Here's my thoughts for your questions:

>>> How do we decide the size of the buffer pool in MemoryDataManager and
the read buffers in FileDataManager?
The BufferPool in MemoryDataManager is the LocalBufferPool used by
ResultPartition, and the size is the same as the current implementation of
sort-merge shuffle. In other words, the minimum value of BufferPool is a
configurable fixed value, and the maximum value is Math.max(min, 4 *
numSubpartitions). The default value can be determined by running the
TPC-DS tests.
Read buffers in FileDataManager are requested from the
BatchShuffleReadBufferPool shared by TaskManager, it's size controlled by
*taskmanager.memory.framework.off-heap.batch-shuffle.size*, the default
value is 32M, which is consistent with the current sort-merge shuffle logic.

>>> Is there an upper limit for the sum of them? If there is, how does
MemoryDataManager and FileDataManager sync the memory usage?
The buffers of the MemoryDataManager are limited by the size of the
LocalBufferPool, and the upper limit is the size of the Network Memory. The
buffers of the FileDataManager are directly requested from
UnpooledOffHeapMemory, and are also limited by the size of the framework
off-heap memory. I think there should be no need for additional
synchronization mechanisms.

>>> How do you disable the slot sharing? If user configures both the slot
sharing group and hybrid shuffle, what will happen to that job?
I think we can print a warning log when Hybrid Shuffle is enabled and SSG
is configured during the JobGraph compilation stage, and fallback to the
region slot sharing group by default. Of course, it will be emphasized in
the document that we do not currently support SSG, If configured, it will
fall back to the default.


Best regards,

Weijie


Yangze Guo  于2022年5月19日周四 16:25写道:

> Thanks for driving this. Xintong and Weijie.
>
> I believe this feature will make Flink a better batch/OLAP engine. +1
> for the overall design.
>
> Some questions:
> 1. How do we decide the size of the buffer pool in MemoryDataManager
> and the read buffers in FileDataManager?
> 2. Is there an upper limit for the sum of them? If there is, how does
> MemoryDataManager and FileDataManager sync the memory usage?
> 3. How do you disable the slot sharing? If user configures both the
> slot sharing group and hybrid shuffle, what will happen to that job?
>
>
> Best,
> Yangze Guo
>
> On Thu, May 19, 2022 at 2:41 PM Xintong Song 
> wrote:
> >
> > Thanks for preparing this FLIP, Weijie.
> >
> > I think this is a good improvement on batch resource elasticity. Looking
> > forward to the community feedback.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Thu, May 19, 2022 at 2:31 PM weijie guo 
> > wrote:
> >
> > > Hi all,
> > >
> > >
> > > I’d like to start a discussion about FLIP-235[1], which introduce a
> new shuffle mode
> > >  can overcome some of the problems of Pipelined Shuffle and Blocking
> Shuffle in batch scenarios.
> > >
> > >
> > > Currently in Flink, task scheduling is more or less constrained by the
> shuffle implementations.
> > > This will bring the following disadvantages:
> > >
> > >1. Pipelined Shuffle:
> > > For pipelined shuffle, the upstream and downstream tasks are
> required to be deployed at the same time, to avoid upstream tasks being
> blocked forever. This is fine when there are enough resources for both
> upstream and downstream tasks to run simultaneously, but will cause the
> following problems otherwise:
> > >1.
> > >   Pipelined shuffle connected tasks (i.e., a pipelined region)
> cannot be executed until obtaining resources for all of them, resulting in
> longer job finishing time and poorer resource efficiency due to holding
> part of the resources idle while waiting for the rest.
> > >   2.
> > >   More severely, if multiple jobs each hold part of the cluster
> resources and are waiting for more, a deadlock would occur. The chance is
> not trivial, especially for scenarios such as OLAP where concurrent job
> submissions are frequent.
> > >   2. Blocking Shuffle:
> > > For blocking shuffle, execution of downstream tasks must wait for
> all upstream tasks to finish, despite there might be more resources
> available. The sequential execution of upstream and downstream tasks
> significantly increase the job finishing time, and the disk IO workload for
> spilling and loading full intermediate data also affects the performance.
> > >
> > >
> > > We believe the root cause of the above problems is that shuffle
> implementations put unnecessary constraints on task scheduling.
> > >
> > >
> > > To solve this problem, Xintong Song and I propose to introduce hybrid
> shuffle to minimize the scheduling constraints. With Hybrid Shuffle, Flink
> should:
> > >
> > >1. Make best use of available resources.
> > > Ideally, we want Flink to always make progress if possible. That
> is to say, it should always execute a pending task if the

Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-19 Thread Yangze Guo
Thanks for driving this. Xintong and Weijie.

I believe this feature will make Flink a better batch/OLAP engine. +1
for the overall design.

Some questions:
1. How do we decide the size of the buffer pool in MemoryDataManager
and the read buffers in FileDataManager?
2. Is there an upper limit for the sum of them? If there is, how does
MemoryDataManager and FileDataManager sync the memory usage?
3. How do you disable the slot sharing? If user configures both the
slot sharing group and hybrid shuffle, what will happen to that job?


Best,
Yangze Guo

On Thu, May 19, 2022 at 2:41 PM Xintong Song  wrote:
>
> Thanks for preparing this FLIP, Weijie.
>
> I think this is a good improvement on batch resource elasticity. Looking
> forward to the community feedback.
>
> Best,
>
> Xintong
>
>
>
> On Thu, May 19, 2022 at 2:31 PM weijie guo 
> wrote:
>
> > Hi all,
> >
> >
> > I’d like to start a discussion about FLIP-235[1], which introduce a new 
> > shuffle mode
> >  can overcome some of the problems of Pipelined Shuffle and Blocking 
> > Shuffle in batch scenarios.
> >
> >
> > Currently in Flink, task scheduling is more or less constrained by the 
> > shuffle implementations.
> > This will bring the following disadvantages:
> >
> >1. Pipelined Shuffle:
> > For pipelined shuffle, the upstream and downstream tasks are required 
> > to be deployed at the same time, to avoid upstream tasks being blocked 
> > forever. This is fine when there are enough resources for both upstream and 
> > downstream tasks to run simultaneously, but will cause the following 
> > problems otherwise:
> >1.
> >   Pipelined shuffle connected tasks (i.e., a pipelined region) cannot 
> > be executed until obtaining resources for all of them, resulting in longer 
> > job finishing time and poorer resource efficiency due to holding part of 
> > the resources idle while waiting for the rest.
> >   2.
> >   More severely, if multiple jobs each hold part of the cluster 
> > resources and are waiting for more, a deadlock would occur. The chance is 
> > not trivial, especially for scenarios such as OLAP where concurrent job 
> > submissions are frequent.
> >   2. Blocking Shuffle:
> > For blocking shuffle, execution of downstream tasks must wait for all 
> > upstream tasks to finish, despite there might be more resources available. 
> > The sequential execution of upstream and downstream tasks significantly 
> > increase the job finishing time, and the disk IO workload for spilling and 
> > loading full intermediate data also affects the performance.
> >
> >
> > We believe the root cause of the above problems is that shuffle 
> > implementations put unnecessary constraints on task scheduling.
> >
> >
> > To solve this problem, Xintong Song and I propose to introduce hybrid 
> > shuffle to minimize the scheduling constraints. With Hybrid Shuffle, Flink 
> > should:
> >
> >1. Make best use of available resources.
> > Ideally, we want Flink to always make progress if possible. That is to 
> > say, it should always execute a pending task if there are resources 
> > available for that task.
> >2. Minimize disk IO load.
> > In-flight data should be consumed directly from memory as much as 
> > possible. Only data that is not consumed timely should be spilled to disk.
> >
> > You can find more details in FLIP-235. Looking forward to your feedback.
> >
> >
> > [1]
> >
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> >
> >
> >
> > Best regards,
> >
> > Weijie
> >


Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-18 Thread Xintong Song
Thanks for preparing this FLIP, Weijie.

I think this is a good improvement on batch resource elasticity. Looking
forward to the community feedback.

Best,

Xintong



On Thu, May 19, 2022 at 2:31 PM weijie guo 
wrote:

> Hi all,
>
>
> I’d like to start a discussion about FLIP-235[1], which introduce a new 
> shuffle mode
>  can overcome some of the problems of Pipelined Shuffle and Blocking Shuffle 
> in batch scenarios.
>
>
> Currently in Flink, task scheduling is more or less constrained by the 
> shuffle implementations.
> This will bring the following disadvantages:
>
>1. Pipelined Shuffle:
> For pipelined shuffle, the upstream and downstream tasks are required to 
> be deployed at the same time, to avoid upstream tasks being blocked forever. 
> This is fine when there are enough resources for both upstream and downstream 
> tasks to run simultaneously, but will cause the following problems otherwise:
>1.
>   Pipelined shuffle connected tasks (i.e., a pipelined region) cannot be 
> executed until obtaining resources for all of them, resulting in longer job 
> finishing time and poorer resource efficiency due to holding part of the 
> resources idle while waiting for the rest.
>   2.
>   More severely, if multiple jobs each hold part of the cluster resources 
> and are waiting for more, a deadlock would occur. The chance is not trivial, 
> especially for scenarios such as OLAP where concurrent job submissions are 
> frequent.
>   2. Blocking Shuffle:
> For blocking shuffle, execution of downstream tasks must wait for all 
> upstream tasks to finish, despite there might be more resources available. 
> The sequential execution of upstream and downstream tasks significantly 
> increase the job finishing time, and the disk IO workload for spilling and 
> loading full intermediate data also affects the performance.
>
>
> We believe the root cause of the above problems is that shuffle 
> implementations put unnecessary constraints on task scheduling.
>
>
> To solve this problem, Xintong Song and I propose to introduce hybrid shuffle 
> to minimize the scheduling constraints. With Hybrid Shuffle, Flink should:
>
>1. Make best use of available resources.
> Ideally, we want Flink to always make progress if possible. That is to 
> say, it should always execute a pending task if there are resources available 
> for that task.
>2. Minimize disk IO load.
> In-flight data should be consumed directly from memory as much as 
> possible. Only data that is not consumed timely should be spilled to disk.
>
> You can find more details in FLIP-235. Looking forward to your feedback.
>
>
> [1]
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
>
>
>
> Best regards,
>
> Weijie
>