Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-20 Thread Zhu Zhu
Thanks Till for the explanation! That looks good to me.

Thanks,
Zhu Zhu

Till Rohrmann  于2019年10月21日周一 上午2:45写道:

> Hi Zhu Zhu,
>
> the cluster partition does not need to be registered at the RM before it
> can be used. The cluster partition descriptor will be reported to the
> client as part of the job execution result. This information is used to
> construct a JobGraph which can consume from a cluster partition. The
> cluster partition descriptor contains all the information necessary to read
> the partition. Hence, a job consuming this partition will simply deploy the
> consumer on a TM and then read the cluster partition described by the
> cluster partition descriptor. If the partition is no longer available, then
> the job will fail and the client needs to handle the situation. If the
> client knows how to reprocess the partition, then it would submit the
> producing job.
>
> Cheers,
> Till
>
> On Sun, Oct 20, 2019 at 12:23 PM Zhu Zhu  wrote:
>
> > Thanks Chesnay for proposing this FLIP! And sorry for the late response
> on
> > it.
> > The FLIP overall looks good to me, except for one question.
> >
> > - If a cluster partition does not exist in RM, how can users tell whether
> > it is not produced yet, or it is already released?
> > Users/InteractiveQuery may need this information to decide to whether to
> > wait or re-execute the producer job.
> > One way I can think of is to also check the producer job's state --
> > unavailable partition of a finished job means the partition is released.
> > But as the cluster partition is notified to RM via TM heartbeat, there
> can
> > be bad case if job is finished but the partition is not updated to RM
> yet.
> > One solution of the bad case might be that TM notifies RM instantly when
> > partitions are promoted, as a supplementary to the TM heartbeat way. It
> > also shortens the time that a consumer job waits for a cluster partition
> to
> > become available, especially for a sequence of short lived jobs. This
> > however introduces JM dependency on RM on job finishes, which is
> unwanted.
> >
> >
> > Thanks,
> > Zhu Zhu
> >
> > Chesnay Schepler  于2019年10月15日周二 下午6:48写道:
> >
> >> I have updated the FLIP.
> >>
> >> - adopted job-/cluster partitions naming scheme
> >> - out-lined interface for new component living in the RM (currently
> >> called ThinShuffleMaster, but I'm not a fan of the name. Suggestions
> >> would be appreciated)
> >> - added a note that the ShuffleService changes are only necessary for
> >> external shuffle services, which could be omitted in a first version
> >>
> >> Unless there are objections I'll start a vote thread later today.
> >>
> >> On 14/10/2019 06:28, Zhijiang wrote:
> >> > Thanks for these further considerations Chesnay!
> >> >
> >> > I guess we might have some misunderstanding. Actually I was not
> >> > against the previous proposal Till suggested before, and I think it is
> >> > a formal way to do that.
> >> >
> >> > And my previous proposal was not for excluding the ShuffleService
> >> > completely. The ShuffleService can be regarded as a factory for
> >> > creating ShuffleMaster on JM/RM side and creating ShuffleEnvironment
> >> > on TE side.
> >> >
> >> >  *
> >> > For the ShuffleEnvironment on TE side: I do not have concerns
> >> > always. The TE receives RPC call for deleting local/global
> >> > partitions and then handle them via ShuffleEnvironment, just the
> >> > similar way as local partitions now.
> >> >  *
> >> > For the ShuffleMaster side: I saw some previous disuccsions on
> >> > multiple ShuffleMaster instances run in different components. I
> >> > was not against this way in essence, but only wonder it might
> >> > bring this feature complex to consider that. So my proposal was
> >> > only for excluding ShuffleMaster if possible to make
> >> > implementation a bit easy. I thought there might have a somewhat
> >> > PartitionTracker component in RM for tracking/deleting global
> >> > partitions, just as we did the way now in JM. The partition state
> >> > is reported from TE and maintained in PartitionTracker of RM, and
> >> > the PartitionTracker could trigger global partition release with
> >> > TE gateway directly, and not further via ShuffleMaster(it is also
> >> > stateless now). And actually in existing PartitionTrackerImpl in
> >> > JM, the PRC call on TE#releasePartitions is also triggered not via
> >> > ShuffleMaster in some cases, and it can be regareded as a shortcut
> >> > way. Of course I am also in favour of via ShuffleMaster to call
> >> > the actual release partition always, and the form seems elegant.
> >> >
> >> > I do not expect my inconsequential thought would block this feature
> >> > ongoing and disturb your previous conclusion. Moreover, Till's recent
> >> > reply already dispels my previous concern. :)
> >> >
> >> > Best,
> >> > Zhijiang
> >> >
> >> > 

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-20 Thread Till Rohrmann
Hi Zhu Zhu,

the cluster partition does not need to be registered at the RM before it
can be used. The cluster partition descriptor will be reported to the
client as part of the job execution result. This information is used to
construct a JobGraph which can consume from a cluster partition. The
cluster partition descriptor contains all the information necessary to read
the partition. Hence, a job consuming this partition will simply deploy the
consumer on a TM and then read the cluster partition described by the
cluster partition descriptor. If the partition is no longer available, then
the job will fail and the client needs to handle the situation. If the
client knows how to reprocess the partition, then it would submit the
producing job.

Cheers,
Till

On Sun, Oct 20, 2019 at 12:23 PM Zhu Zhu  wrote:

> Thanks Chesnay for proposing this FLIP! And sorry for the late response on
> it.
> The FLIP overall looks good to me, except for one question.
>
> - If a cluster partition does not exist in RM, how can users tell whether
> it is not produced yet, or it is already released?
> Users/InteractiveQuery may need this information to decide to whether to
> wait or re-execute the producer job.
> One way I can think of is to also check the producer job's state --
> unavailable partition of a finished job means the partition is released.
> But as the cluster partition is notified to RM via TM heartbeat, there can
> be bad case if job is finished but the partition is not updated to RM yet.
> One solution of the bad case might be that TM notifies RM instantly when
> partitions are promoted, as a supplementary to the TM heartbeat way. It
> also shortens the time that a consumer job waits for a cluster partition to
> become available, especially for a sequence of short lived jobs. This
> however introduces JM dependency on RM on job finishes, which is unwanted.
>
>
> Thanks,
> Zhu Zhu
>
> Chesnay Schepler  于2019年10月15日周二 下午6:48写道:
>
>> I have updated the FLIP.
>>
>> - adopted job-/cluster partitions naming scheme
>> - out-lined interface for new component living in the RM (currently
>> called ThinShuffleMaster, but I'm not a fan of the name. Suggestions
>> would be appreciated)
>> - added a note that the ShuffleService changes are only necessary for
>> external shuffle services, which could be omitted in a first version
>>
>> Unless there are objections I'll start a vote thread later today.
>>
>> On 14/10/2019 06:28, Zhijiang wrote:
>> > Thanks for these further considerations Chesnay!
>> >
>> > I guess we might have some misunderstanding. Actually I was not
>> > against the previous proposal Till suggested before, and I think it is
>> > a formal way to do that.
>> >
>> > And my previous proposal was not for excluding the ShuffleService
>> > completely. The ShuffleService can be regarded as a factory for
>> > creating ShuffleMaster on JM/RM side and creating ShuffleEnvironment
>> > on TE side.
>> >
>> >  *
>> > For the ShuffleEnvironment on TE side: I do not have concerns
>> > always. The TE receives RPC call for deleting local/global
>> > partitions and then handle them via ShuffleEnvironment, just the
>> > similar way as local partitions now.
>> >  *
>> > For the ShuffleMaster side: I saw some previous disuccsions on
>> > multiple ShuffleMaster instances run in different components. I
>> > was not against this way in essence, but only wonder it might
>> > bring this feature complex to consider that. So my proposal was
>> > only for excluding ShuffleMaster if possible to make
>> > implementation a bit easy. I thought there might have a somewhat
>> > PartitionTracker component in RM for tracking/deleting global
>> > partitions, just as we did the way now in JM. The partition state
>> > is reported from TE and maintained in PartitionTracker of RM, and
>> > the PartitionTracker could trigger global partition release with
>> > TE gateway directly, and not further via ShuffleMaster(it is also
>> > stateless now). And actually in existing PartitionTrackerImpl in
>> > JM, the PRC call on TE#releasePartitions is also triggered not via
>> > ShuffleMaster in some cases, and it can be regareded as a shortcut
>> > way. Of course I am also in favour of via ShuffleMaster to call
>> > the actual release partition always, and the form seems elegant.
>> >
>> > I do not expect my inconsequential thought would block this feature
>> > ongoing and disturb your previous conclusion. Moreover, Till's recent
>> > reply already dispels my previous concern. :)
>> >
>> > Best,
>> > Zhijiang
>> >
>> > --
>> > From:Chesnay Schepler 
>> > Send Time:2019年10月14日(星期一) 07:00
>> > To:dev ; Till Rohrmann
>> > ; zhijiang > .invalid>
>> > Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
>> >
>> > I'm quite torn on whether to exclude the ShuffleServices from the
>> > 

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-20 Thread Zhu Zhu
Thanks Chesnay for proposing this FLIP! And sorry for the late response on
it.
The FLIP overall looks good to me, except for one question.

- If a cluster partition does not exist in RM, how can users tell whether
it is not produced yet, or it is already released?
Users/InteractiveQuery may need this information to decide to whether to
wait or re-execute the producer job.
One way I can think of is to also check the producer job's state --
unavailable partition of a finished job means the partition is released.
But as the cluster partition is notified to RM via TM heartbeat, there can
be bad case if job is finished but the partition is not updated to RM yet.
One solution of the bad case might be that TM notifies RM instantly when
partitions are promoted, as a supplementary to the TM heartbeat way. It
also shortens the time that a consumer job waits for a cluster partition to
become available, especially for a sequence of short lived jobs. This
however introduces JM dependency on RM on job finishes, which is unwanted.


Thanks,
Zhu Zhu

Chesnay Schepler  于2019年10月15日周二 下午6:48写道:

> I have updated the FLIP.
>
> - adopted job-/cluster partitions naming scheme
> - out-lined interface for new component living in the RM (currently
> called ThinShuffleMaster, but I'm not a fan of the name. Suggestions
> would be appreciated)
> - added a note that the ShuffleService changes are only necessary for
> external shuffle services, which could be omitted in a first version
>
> Unless there are objections I'll start a vote thread later today.
>
> On 14/10/2019 06:28, Zhijiang wrote:
> > Thanks for these further considerations Chesnay!
> >
> > I guess we might have some misunderstanding. Actually I was not
> > against the previous proposal Till suggested before, and I think it is
> > a formal way to do that.
> >
> > And my previous proposal was not for excluding the ShuffleService
> > completely. The ShuffleService can be regarded as a factory for
> > creating ShuffleMaster on JM/RM side and creating ShuffleEnvironment
> > on TE side.
> >
> >  *
> > For the ShuffleEnvironment on TE side: I do not have concerns
> > always. The TE receives RPC call for deleting local/global
> > partitions and then handle them via ShuffleEnvironment, just the
> > similar way as local partitions now.
> >  *
> > For the ShuffleMaster side: I saw some previous disuccsions on
> > multiple ShuffleMaster instances run in different components. I
> > was not against this way in essence, but only wonder it might
> > bring this feature complex to consider that. So my proposal was
> > only for excluding ShuffleMaster if possible to make
> > implementation a bit easy. I thought there might have a somewhat
> > PartitionTracker component in RM for tracking/deleting global
> > partitions, just as we did the way now in JM. The partition state
> > is reported from TE and maintained in PartitionTracker of RM, and
> > the PartitionTracker could trigger global partition release with
> > TE gateway directly, and not further via ShuffleMaster(it is also
> > stateless now). And actually in existing PartitionTrackerImpl in
> > JM, the PRC call on TE#releasePartitions is also triggered not via
> > ShuffleMaster in some cases, and it can be regareded as a shortcut
> > way. Of course I am also in favour of via ShuffleMaster to call
> > the actual release partition always, and the form seems elegant.
> >
> > I do not expect my inconsequential thought would block this feature
> > ongoing and disturb your previous conclusion. Moreover, Till's recent
> > reply already dispels my previous concern. :)
> >
> > Best,
> > Zhijiang
> >
> > --
> > From:Chesnay Schepler 
> > Send Time:2019年10月14日(星期一) 07:00
> > To:dev ; Till Rohrmann
> > ; zhijiang  .invalid>
> > Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
> >
> > I'm quite torn on whether to exclude the ShuffleServices from the
> > proposal. I think I'm now on my third or fourth iteration for a
> >
>  response, so I'll just send both so I can stop thinking for a bit about
> >
> > whether to push for one or the other:
> >
> > Opinion A, aka "Nu Uh":
> >
> >
>  I'm not in favor of excluding the shuffle master from this proposal;
> >
>  I believe it raises interesting questions that should be discussed
> >
>  beforehand; otherwise we may just end up developing ourselves into a
> > corner.
> > Unless there are good reasons for doing so I'd prefer to keep the
> > functionality across shuffle services consistent.
> > And man, my last sentence is giving me headaches (how can you
> >
>  introduce inconsistencies across shuffle services if you don't even
> > touch them?..)
> >
> >
>  Ultimately the RM only needs the ShuffleService for 2 things, which
> > are fairly 

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-15 Thread Chesnay Schepler

I have updated the FLIP.

- adopted job-/cluster partitions naming scheme
- out-lined interface for new component living in the RM (currently 
called ThinShuffleMaster, but I'm not a fan of the name. Suggestions 
would be appreciated)
- added a note that the ShuffleService changes are only necessary for 
external shuffle services, which could be omitted in a first version


Unless there are objections I'll start a vote thread later today.

On 14/10/2019 06:28, Zhijiang wrote:

Thanks for these further considerations Chesnay!

I guess we might have some misunderstanding. Actually I was not 
against the previous proposal Till suggested before, and I think it is 
a formal way to do that.


And my previous proposal was not for excluding the ShuffleService 
completely. The ShuffleService can be regarded as a factory for 
creating ShuffleMaster on JM/RM side and creating ShuffleEnvironment 
on TE side.


 *
For the ShuffleEnvironment on TE side: I do not have concerns
always. The TE receives RPC call for deleting local/global
partitions and then handle them via ShuffleEnvironment, just the
similar way as local partitions now.
 *
For the ShuffleMaster side: I saw some previous disuccsions on
multiple ShuffleMaster instances run in different components. I
was not against this way in essence, but only wonder it might
bring this feature complex to consider that. So my proposal was
only for excluding ShuffleMaster if possible to make
implementation a bit easy. I thought there might have a somewhat
PartitionTracker component in RM for tracking/deleting global
partitions, just as we did the way now in JM. The partition state
is reported from TE and maintained in PartitionTracker of RM, and
the PartitionTracker could trigger global partition release with
TE gateway directly, and not further via ShuffleMaster(it is also
stateless now). And actually in existing PartitionTrackerImpl in
JM, the PRC call on TE#releasePartitions is also triggered not via
ShuffleMaster in some cases, and it can be regareded as a shortcut
way. Of course I am also in favour of via ShuffleMaster to call
the actual release partition always, and the form seems elegant.

I do not expect my inconsequential thought would block this feature 
ongoing and disturb your previous conclusion. Moreover, Till's recent 
reply already dispels my previous concern. :)


Best,
Zhijiang

--
From:Chesnay Schepler 
Send Time:2019年10月14日(星期一) 07:00
To:dev ; Till Rohrmann
; zhijiang 
Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle

I'm quite torn on whether to exclude the ShuffleServices from the
proposal. I think I'm now on my third or fourth iteration for a
response, so I'll just send both so I can stop thinking for a bit about

whether to push for one or the other:

Opinion A, aka "Nu Uh":

I'm not in favor of excluding the shuffle master from this proposal;
I believe it raises interesting questions that should be discussed
beforehand; otherwise we may just end up developing ourselves into a
corner.
Unless there are good reasons for doing so I'd prefer to keep the
functionality across shuffle services consistent.
And man, my last sentence is giving me headaches (how can you
introduce inconsistencies across shuffle services if you don't even
touch them?..)

Ultimately the RM only needs the ShuffleService for 2 things, which
are fairly straight-forward:

 1. list partitions
 2. delete partitions

Both of these are /exclusively /used via the REST APIs. In terms of
scope I wanted this proposal to contain something that feels
complete. If there is functionality to have a partition stick
around, there needs to be a mechanism to delete it. Thus you also
need a way to list them, simply for practical purposes. I do believe
that without these this whole proposal is very much incomplete and
would hate to see them excluded. It just /makes sense/ to have them.
Yes, technically speak

Could we exclude the external shuffle services from this logic?
Sure, but I'm quite worried that we will not tackle this problem
again for 1.10, and if we don't we end up with really inconsistent
behavior across versions. In 1.9 you can have local state in your
master implementation, and, bar extraordinary circumstances, will
get a release call for partition that was registered. In 1.10 that
last part that goes down the drain, and in 1.X the last part is back
in play but you can't have local state anymore since another
instance is running on the RM.

Who is even supposed to keep up with that? It's still an interface
that is exposed to every user. I 

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-13 Thread Zhijiang
Thanks for these further considerations Chesnay!

I guess we might have some misunderstanding. Actually I was not against the 
previous proposal Till suggested before, and I think it is a formal way to do 
that.

And my previous proposal was not for excluding the ShuffleService completely. 
The ShuffleService can be regarded as a factory for creating ShuffleMaster on 
JM/RM side and creating ShuffleEnvironment on TE side. 
For the ShuffleEnvironment on TE side: I do not have concerns always. The TE 
receives RPC call for deleting local/global partitions and then handle them via 
ShuffleEnvironment, just the similar way as local partitions now.
For the ShuffleMaster side: I saw some previous disuccsions on multiple 
ShuffleMaster instances run in different components. I was not against this way 
in essence, but only wonder it might bring this feature complex to consider 
that. So my proposal was only for excluding ShuffleMaster if possible to make 
implementation a bit easy. I thought there might have a somewhat 
PartitionTracker component in RM for tracking/deleting global partitions, just 
as we did the way now in JM. The partition state is reported from TE and 
maintained in PartitionTracker of RM, and the PartitionTracker could trigger 
global partition release with TE gateway directly, and not further via 
ShuffleMaster(it is also stateless now). And actually in existing 
PartitionTrackerImpl in JM, the PRC call on TE#releasePartitions is also 
triggered not via ShuffleMaster in some cases, and it can be regareded as a 
shortcut way. Of course I am also in favour of via ShuffleMaster to call the 
actual release partition always, and the form seems elegant. 
I do not expect my inconsequential thought would block this feature ongoing and 
disturb your previous conclusion. Moreover, Till's recent reply already dispels 
my previous concern. :)

Best,
Zhijiang
--
From:Chesnay Schepler 
Send Time:2019年10月14日(星期一) 07:00
To:dev ; Till Rohrmann ; zhijiang 

Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle

I'm quite torn on whether to exclude the ShuffleServices from the 
proposal. I think I'm now on my third or fourth iteration for a 
response, so I'll just send both so I can stop thinking for a bit about 
whether to push for one or the other:

Opinion A, aka "Nu Uh":

I'm not in favor of excluding the shuffle master from this proposal;
I believe it raises interesting questions that should be discussed
beforehand; otherwise we may just end up developing ourselves into a
corner.
Unless there are good reasons for doing so I'd prefer to keep the
functionality across shuffle services consistent.
And man, my last sentence is giving me headaches (how can you
introduce inconsistencies across shuffle services if you don't even
touch them?..)

Ultimately the RM only needs the ShuffleService for 2 things, which
are fairly straight-forward:

 1. list partitions
 2. delete partitions

Both of these are /exclusively /used via the REST APIs. In terms of
scope I wanted this proposal to contain something that feels
complete. If there is functionality to have a partition stick
around, there needs to be a mechanism to delete it. Thus you also
need a way to list them, simply for practical purposes. I do believe
that without these this whole proposal is very much incomplete and
would hate to see them excluded. It just /makes sense/ to have them.
Yes, technically speak

Could we exclude the external shuffle services from this logic?
Sure, but I'm quite worried that we will not tackle this problem
again for 1.10, and if we don't we end up with really inconsistent
behavior across versions. In 1.9 you can have local state in your
master implementation, and, bar extraordinary circumstances, will
get a release call for partition that was registered. In 1.10 that
last part that goes down the drain, and in 1.X the last part is back
in play but you can't have local state anymore since another
instance is running on the RM.

Who is even supposed to keep up with that? It's still an interface
that is exposed to every user. I don't think we should impose
constraints in such a cut loose fashion.

At last, the fact that we can implement this in a way where it works
for some shuffle services and not others should already be quite a
red flag. The RM maybe shouldn't do any tracking and just forward
the heartbeat payload to the ThinShuffleMaster present on the RM.

Opinion B, aka "technically it would be fine"

The counterpoint to the whole REST API completeness argument is that
while the /runtime //supports /having partitions stick around, there
is technically no way for anyone to enable such behavior at runtime.
Hence, with no user-facing APIs to enable the feature, we don't
necessarily need a user-facing 

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-13 Thread Zhijiang
Thanks for the further explanation Till!

It is fine for me to run only one ShuffleMaster instance as now, and make RM 
handle the deletion of cluster partitions in a light-weight way.
I also have no concerns of letting TE handle the deletion of cluster 
partititions as did for job partitions now. 

Best,
Zhijiang
--
From:Till Rohrmann 
Send Time:2019年10月13日(星期日) 17:04
To:zhijiang 
Cc:dev 
Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle

I think we won't necessarily run multiple ShuffleMasters. I think it would
be better to pass in a leaner interface into the RM to only handle the
deletion of the global result partitions.

Letting the TEs handle the deletion of the global result partitions might
work as long as we don't have an external shuffle service implementation.
Hence, it could be a first step to decrease complexity but in order to
complete this feature, I think we need to do it differently.

Cheers,
Till

On Sat, Oct 12, 2019 at 7:39 AM zhijiang 
wrote:

> Sorry for delay catching up with the recent progress. Thanks for the FLIP
> update and valuable discussions!
>
> I also like the term of job/cluster partitions, and agree with most of the
> previous comments.
>
> Only left one concern of ShuffleMaster side:
> >However, if the separation of JM/RM into separate processes, as outlined
> in FLIP-6, is ever fully realized it necessarily implies that multiple
> shuffle master instances may exist for a given shuffle service.
>
> My previous thought was that one ShuffleService factory is for creating
> one shuffleMaster instance. If we have multiple ShuffleMaster instances, we
> might also need differentt ShuffleService factories.
> And it seems that different ShuffleMaster instances could run in different
> components based on demands, e.g. dispatcher, JM, RM.
>
> Is it also feasible to not touch the ShuffleMaster concept in this FLIP to
> make things a bit easy? I mean the ShuffleMaster is still running in JM
> component and is responsbile for job partitions. For the case of cluster
> partitions, the RM could interact with TE directly. TE would report global
> partitions as payloads via heartbeat with RM. And the RM could call
> TE#releaseGlobalPartitions directly not via ShuffleMaster.  Even the RM
> could also pass the global released partitions via payloads in heartbeat
> with TE to reduce additional explict RPC call, but this would bring some
> delays for releasing partition based on heartbeat interval.
>
> Best,
> Zhijiang
> --
> From:Chesnay Schepler 
> Send Time:2019年10月11日(星期五) 10:21
> To:dev ; Till Rohrmann 
> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
>
> h I like job-/cluster partitions.
>
> On 10/10/2019 16:27, Till Rohrmann wrote:
> > I think we should introduce a separate interface for the ResourceManager
> so
> > that it can list and delete global result partitions from the shuffle
> > service implementation. As long as the JM and RM run in the same process,
> > this interface could be implemented by the ShuffleMaster implementations.
> > However, we should make sure that we don't introduce unnecessary
> > concurrency. If that should be the case, then it might be simpler to have
> > two separate components.
> >
> > Some ideas for the naming problem:
> >
> > local/global: job/cluster, intra/inter
> >
> > Cheers,
> > Till
> >
> > On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler 
> wrote:
> >
> >> Are there any other opinions in regards to the naming scheme?
> >> (local/global, promote)
> >>
> >> On 06/09/2019 15:16, Chesnay Schepler wrote:
> >>> Hello,
> >>>
> >>> FLIP-36 (interactive programming)
> >>> <
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> >
> >>
> >>> proposes a new programming paradigm where jobs are built incrementally
> >>> by the user.
> >>>
> >>> To support this in an efficient manner I propose to extend partition
> >>> life-cycle to support the notion of /global partitions/, which are
> >>> partitions that can exist beyond the life-time of a job.
> >>>
> >>> These partitions could then be re-used by subsequent jobs in a fairly
> >>> efficient manner, as they don't have to persisted to an external
> >>> storage first and consuming tasks could be scheduled to exploit
> >>> data-locality.
> >>>
> >>> The FLIP outlines the required changes on the JobMaster, TaskExecutor
> >>> and ResourceManager to support this from a life-cycle perspective.
> >>>
> >>> This FLIP does /not/ concern itself with the /usage/ of global
> >>> partitions, including client-side APIs, job-submission, scheduling and
> >>> reading said partitions; these are all follow-ups that will either be
> >>> part of FLIP-36 or spliced out into separate FLIPs.
> >>>
> >>>
> >>
>
>



Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-13 Thread Chesnay Schepler
I'm quite torn on whether to exclude the ShuffleServices from the 
proposal. I think I'm now on my third or fourth iteration for a 
response, so I'll just send both so I can stop thinking for a bit about 
whether to push for one or the other:


Opinion A, aka "Nu Uh":

   I'm not in favor of excluding the shuffle master from this proposal;
   I believe it raises interesting questions that should be discussed
   beforehand; otherwise we may just end up developing ourselves into a
   corner.
   Unless there are good reasons for doing so I'd prefer to keep the
   functionality across shuffle services consistent.
   And man, my last sentence is giving me headaches (how can you
   introduce inconsistencies across shuffle services if you don't even
   touch them?..)

   Ultimately the RM only needs the ShuffleService for 2 things, which
   are fairly straight-forward:

1. list partitions
2. delete partitions

   Both of these are /exclusively /used via the REST APIs. In terms of
   scope I wanted this proposal to contain something that feels
   complete. If there is functionality to have a partition stick
   around, there needs to be a mechanism to delete it. Thus you also
   need a way to list them, simply for practical purposes. I do believe
   that without these this whole proposal is very much incomplete and
   would hate to see them excluded. It just /makes sense/ to have them.
   Yes, technically speak

   Could we exclude the external shuffle services from this logic?
   Sure, but I'm quite worried that we will not tackle this problem
   again for 1.10, and if we don't we end up with really inconsistent
   behavior across versions. In 1.9 you can have local state in your
   master implementation, and, bar extraordinary circumstances, will
   get a release call for partition that was registered. In 1.10 that
   last part that goes down the drain, and in 1.X the last part is back
   in play but you can't have local state anymore since another
   instance is running on the RM.

   Who is even supposed to keep up with that? It's still an interface
   that is exposed to every user. I don't think we should impose
   constraints in such a cut loose fashion.

   At last, the fact that we can implement this in a way where it works
   for some shuffle services and not others should already be quite a
   red flag. The RM maybe shouldn't do any tracking and just forward
   the heartbeat payload to the ThinShuffleMaster present on the RM.

Opinion B, aka "technically it would be fine"

   The counterpoint to the whole REST API completeness argument is that
   while the /runtime //supports /having partitions stick around, there
   is technically no way for anyone to enable such behavior at runtime.
   Hence, with no user-facing APIs to enable the feature, we don't
   necessarily need a user-facing API for management purposes, and
   could defer both to a later point where this feature is exposed
   fully to users.

   But then it's hard to justify having any communication between the
   TE and RM at all; it literally serves no purpose. The TE could just
   keep cluster partitions around until the RM disconnects. Which would
   then also raise the question what exactly of substance is left in
   this proposal.

@Till yes, the RM should work against a different interface; I don't 
think anyone has argued against that. Let's put this point to rest. :)


On 13/10/2019 11:04, Till Rohrmann wrote:

I think we won't necessarily run multiple ShuffleMasters. I think it would
be better to pass in a leaner interface into the RM to only handle the
deletion of the global result partitions.

Letting the TEs handle the deletion of the global result partitions might
work as long as we don't have an external shuffle service implementation.
Hence, it could be a first step to decrease complexity but in order to
complete this feature, I think we need to do it differently.

Cheers,
Till

On Sat, Oct 12, 2019 at 7:39 AM zhijiang 
wrote:


Sorry for delay catching up with the recent progress. Thanks for the FLIP
update and valuable discussions!

I also like the term of job/cluster partitions, and agree with most of the
previous comments.

Only left one concern of ShuffleMaster side:

However, if the separation of JM/RM into separate processes, as outlined

in FLIP-6, is ever fully realized it necessarily implies that multiple
shuffle master instances may exist for a given shuffle service.

My previous thought was that one ShuffleService factory is for creating
one shuffleMaster instance. If we have multiple ShuffleMaster instances, we
might also need differentt ShuffleService factories.
And it seems that different ShuffleMaster instances could run in different
components based on demands, e.g. dispatcher, JM, RM.

Is it also feasible to not touch the ShuffleMaster concept in this FLIP to
make things a bit easy? I mean the ShuffleMaster is still running in JM
component and is responsbile for job partitions. For the case of cluster

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-13 Thread Till Rohrmann
I think we won't necessarily run multiple ShuffleMasters. I think it would
be better to pass in a leaner interface into the RM to only handle the
deletion of the global result partitions.

Letting the TEs handle the deletion of the global result partitions might
work as long as we don't have an external shuffle service implementation.
Hence, it could be a first step to decrease complexity but in order to
complete this feature, I think we need to do it differently.

Cheers,
Till

On Sat, Oct 12, 2019 at 7:39 AM zhijiang 
wrote:

> Sorry for delay catching up with the recent progress. Thanks for the FLIP
> update and valuable discussions!
>
> I also like the term of job/cluster partitions, and agree with most of the
> previous comments.
>
> Only left one concern of ShuffleMaster side:
> >However, if the separation of JM/RM into separate processes, as outlined
> in FLIP-6, is ever fully realized it necessarily implies that multiple
> shuffle master instances may exist for a given shuffle service.
>
> My previous thought was that one ShuffleService factory is for creating
> one shuffleMaster instance. If we have multiple ShuffleMaster instances, we
> might also need differentt ShuffleService factories.
> And it seems that different ShuffleMaster instances could run in different
> components based on demands, e.g. dispatcher, JM, RM.
>
> Is it also feasible to not touch the ShuffleMaster concept in this FLIP to
> make things a bit easy? I mean the ShuffleMaster is still running in JM
> component and is responsbile for job partitions. For the case of cluster
> partitions, the RM could interact with TE directly. TE would report global
> partitions as payloads via heartbeat with RM. And the RM could call
> TE#releaseGlobalPartitions directly not via ShuffleMaster.  Even the RM
> could also pass the global released partitions via payloads in heartbeat
> with TE to reduce additional explict RPC call, but this would bring some
> delays for releasing partition based on heartbeat interval.
>
> Best,
> Zhijiang
> --
> From:Chesnay Schepler 
> Send Time:2019年10月11日(星期五) 10:21
> To:dev ; Till Rohrmann 
> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
>
> h I like job-/cluster partitions.
>
> On 10/10/2019 16:27, Till Rohrmann wrote:
> > I think we should introduce a separate interface for the ResourceManager
> so
> > that it can list and delete global result partitions from the shuffle
> > service implementation. As long as the JM and RM run in the same process,
> > this interface could be implemented by the ShuffleMaster implementations.
> > However, we should make sure that we don't introduce unnecessary
> > concurrency. If that should be the case, then it might be simpler to have
> > two separate components.
> >
> > Some ideas for the naming problem:
> >
> > local/global: job/cluster, intra/inter
> >
> > Cheers,
> > Till
> >
> > On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler 
> wrote:
> >
> >> Are there any other opinions in regards to the naming scheme?
> >> (local/global, promote)
> >>
> >> On 06/09/2019 15:16, Chesnay Schepler wrote:
> >>> Hello,
> >>>
> >>> FLIP-36 (interactive programming)
> >>> <
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> >
> >>
> >>> proposes a new programming paradigm where jobs are built incrementally
> >>> by the user.
> >>>
> >>> To support this in an efficient manner I propose to extend partition
> >>> life-cycle to support the notion of /global partitions/, which are
> >>> partitions that can exist beyond the life-time of a job.
> >>>
> >>> These partitions could then be re-used by subsequent jobs in a fairly
> >>> efficient manner, as they don't have to persisted to an external
> >>> storage first and consuming tasks could be scheduled to exploit
> >>> data-locality.
> >>>
> >>> The FLIP outlines the required changes on the JobMaster, TaskExecutor
> >>> and ResourceManager to support this from a life-cycle perspective.
> >>>
> >>> This FLIP does /not/ concern itself with the /usage/ of global
> >>> partitions, including client-side APIs, job-submission, scheduling and
> >>> reading said partitions; these are all follow-ups that will either be
> >>> part of FLIP-36 or spliced out into separate FLIPs.
> >>>
> >>>
> >>
>
>


Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-11 Thread zhijiang
Sorry for delay catching up with the recent progress. Thanks for the FLIP 
update and valuable discussions!

I also like the term of job/cluster partitions, and agree with most of the 
previous comments.

Only left one concern of ShuffleMaster side:
>However, if the separation of JM/RM into separate processes, as outlined in 
>FLIP-6, is ever fully realized it necessarily implies that multiple shuffle 
>master instances may exist for a given shuffle service.

My previous thought was that one ShuffleService factory is for creating one 
shuffleMaster instance. If we have multiple ShuffleMaster instances, we might 
also need differentt ShuffleService factories.
And it seems that different ShuffleMaster instances could run in different 
components based on demands, e.g. dispatcher, JM, RM. 

Is it also feasible to not touch the ShuffleMaster concept in this FLIP to make 
things a bit easy? I mean the ShuffleMaster is still running in JM component 
and is responsbile for job partitions. For the case of cluster partitions, the 
RM could interact with TE directly. TE would report global partitions as 
payloads via heartbeat with RM. And the RM could call 
TE#releaseGlobalPartitions directly not via ShuffleMaster.  Even the RM could 
also pass the global released partitions via payloads in heartbeat with TE to 
reduce additional explict RPC call, but this would bring some delays for 
releasing partition based on heartbeat interval.

Best,
Zhijiang
--
From:Chesnay Schepler 
Send Time:2019年10月11日(星期五) 10:21
To:dev ; Till Rohrmann 
Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle

h I like job-/cluster partitions.

On 10/10/2019 16:27, Till Rohrmann wrote:
> I think we should introduce a separate interface for the ResourceManager so
> that it can list and delete global result partitions from the shuffle
> service implementation. As long as the JM and RM run in the same process,
> this interface could be implemented by the ShuffleMaster implementations.
> However, we should make sure that we don't introduce unnecessary
> concurrency. If that should be the case, then it might be simpler to have
> two separate components.
>
> Some ideas for the naming problem:
>
> local/global: job/cluster, intra/inter
>
> Cheers,
> Till
>
> On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler  wrote:
>
>> Are there any other opinions in regards to the naming scheme?
>> (local/global, promote)
>>
>> On 06/09/2019 15:16, Chesnay Schepler wrote:
>>> Hello,
>>>
>>> FLIP-36 (interactive programming)
>>> <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink>
>>
>>> proposes a new programming paradigm where jobs are built incrementally
>>> by the user.
>>>
>>> To support this in an efficient manner I propose to extend partition
>>> life-cycle to support the notion of /global partitions/, which are
>>> partitions that can exist beyond the life-time of a job.
>>>
>>> These partitions could then be re-used by subsequent jobs in a fairly
>>> efficient manner, as they don't have to persisted to an external
>>> storage first and consuming tasks could be scheduled to exploit
>>> data-locality.
>>>
>>> The FLIP outlines the required changes on the JobMaster, TaskExecutor
>>> and ResourceManager to support this from a life-cycle perspective.
>>>
>>> This FLIP does /not/ concern itself with the /usage/ of global
>>> partitions, including client-side APIs, job-submission, scheduling and
>>> reading said partitions; these are all follow-ups that will either be
>>> part of FLIP-36 or spliced out into separate FLIPs.
>>>
>>>
>>



Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-11 Thread Chesnay Schepler

h I like job-/cluster partitions.

On 10/10/2019 16:27, Till Rohrmann wrote:

I think we should introduce a separate interface for the ResourceManager so
that it can list and delete global result partitions from the shuffle
service implementation. As long as the JM and RM run in the same process,
this interface could be implemented by the ShuffleMaster implementations.
However, we should make sure that we don't introduce unnecessary
concurrency. If that should be the case, then it might be simpler to have
two separate components.

Some ideas for the naming problem:

local/global: job/cluster, intra/inter

Cheers,
Till

On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler  wrote:


Are there any other opinions in regards to the naming scheme?
(local/global, promote)

On 06/09/2019 15:16, Chesnay Schepler wrote:

Hello,

FLIP-36 (interactive programming)
<

https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink>


proposes a new programming paradigm where jobs are built incrementally
by the user.

To support this in an efficient manner I propose to extend partition
life-cycle to support the notion of /global partitions/, which are
partitions that can exist beyond the life-time of a job.

These partitions could then be re-used by subsequent jobs in a fairly
efficient manner, as they don't have to persisted to an external
storage first and consuming tasks could be scheduled to exploit
data-locality.

The FLIP outlines the required changes on the JobMaster, TaskExecutor
and ResourceManager to support this from a life-cycle perspective.

This FLIP does /not/ concern itself with the /usage/ of global
partitions, including client-side APIs, job-submission, scheduling and
reading said partitions; these are all follow-ups that will either be
part of FLIP-36 or spliced out into separate FLIPs.








Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-10 Thread Till Rohrmann
I think we should introduce a separate interface for the ResourceManager so
that it can list and delete global result partitions from the shuffle
service implementation. As long as the JM and RM run in the same process,
this interface could be implemented by the ShuffleMaster implementations.
However, we should make sure that we don't introduce unnecessary
concurrency. If that should be the case, then it might be simpler to have
two separate components.

Some ideas for the naming problem:

local/global: job/cluster, intra/inter

Cheers,
Till

On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler  wrote:

> Are there any other opinions in regards to the naming scheme?
> (local/global, promote)
>
> On 06/09/2019 15:16, Chesnay Schepler wrote:
> > Hello,
> >
> > FLIP-36 (interactive programming)
> > <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink>
>
> > proposes a new programming paradigm where jobs are built incrementally
> > by the user.
> >
> > To support this in an efficient manner I propose to extend partition
> > life-cycle to support the notion of /global partitions/, which are
> > partitions that can exist beyond the life-time of a job.
> >
> > These partitions could then be re-used by subsequent jobs in a fairly
> > efficient manner, as they don't have to persisted to an external
> > storage first and consuming tasks could be scheduled to exploit
> > data-locality.
> >
> > The FLIP outlines the required changes on the JobMaster, TaskExecutor
> > and ResourceManager to support this from a life-cycle perspective.
> >
> > This FLIP does /not/ concern itself with the /usage/ of global
> > partitions, including client-side APIs, job-submission, scheduling and
> > reading said partitions; these are all follow-ups that will either be
> > part of FLIP-36 or spliced out into separate FLIPs.
> >
> >
>
>


Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-09 Thread Chesnay Schepler
Are there any other opinions in regards to the naming scheme? 
(local/global, promote)


On 06/09/2019 15:16, Chesnay Schepler wrote:

Hello,

FLIP-36 (interactive programming) 
 
proposes a new programming paradigm where jobs are built incrementally 
by the user.


To support this in an efficient manner I propose to extend partition 
life-cycle to support the notion of /global partitions/, which are 
partitions that can exist beyond the life-time of a job.


These partitions could then be re-used by subsequent jobs in a fairly 
efficient manner, as they don't have to persisted to an external 
storage first and consuming tasks could be scheduled to exploit 
data-locality.


The FLIP outlines the required changes on the JobMaster, TaskExecutor 
and ResourceManager to support this from a life-cycle perspective.


This FLIP does /not/ concern itself with the /usage/ of global 
partitions, including client-side APIs, job-submission, scheduling and 
reading said partitions; these are all follow-ups that will either be 
part of FLIP-36 or spliced out into separate FLIPs.







Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-09 Thread Chesnay Schepler
While we could argue that it's a new interface so we aren't /technically 
/changing anything about the ShuffleMaster, I'd assume most people would 
just have the ShuffleMaster implement the new interface and call it a day.


On 09/10/2019 09:57, Chesnay Schepler wrote:
So should we enforce having 2 instances now or defer this to a later 
date?


I'd rather do this early since it changes 2 assumptions that 
ShuffleMaster can currently make:

- every partition release is preceded by a registration of said partition
- the release of partitions may rely on local data

On 04/10/2019 17:10, Till Rohrmann wrote:

Thanks for updating the FLIP.

I think the RM does not need to have access to a full fledged 
ShuffleMaster

implementation. Instead it should enough to give it a leaner interface
which only supports to delete result partitions and list available 
global

partitions. This might entail that one will have a ShuffleMaster
implementation running on the Dispatcher and a
GlobalResultPartitionsShuffleMaster implementation running on the RM. 
Long

story short, if we separate the RM from the Dispatcher, then this might
entail that we will have two ShuffleMaster incarnations running in each
process.

Cheers,
Till

On Fri, Oct 4, 2019 at 3:34 PM Chesnay Schepler  
wrote:



I have updated the FLIP.

- consistently use "local"/"global" terminology; this incidentally 
should

make it easier to update the terminology if we decide on other names
- inform RM via heartbeats from TE about available global partitions
- add dedicated method for releasing global partitions
- add dedicated section for required changes to the ShuffleMaster 
(mostly

clarification)
- added some items to the "Rejected Alternatives" section
- updated discussion link


While writing the ShuffleMaster section I noticed the following:

If, at any point, the JM/RM are moved into dedicated processes we 
either

a) have multiple ShuffleMaster instances for the same shuffle service
active
b) require a single ShuffleMaster on the RM, to which JM calls are 
being

forwarded.

Neither of these are without pain-points;
a) introduces additional constraints on ShuffleMaster 
implementations in

that no local state must be kept
b) again forces the JM to regularly be in touch with the RM, and limits
the ShuffleMaster interface to being RPC-friendly.

I'm wondering whether this issue was already an anyone's radar.


On 04/10/2019 14:12, Till Rohrmann wrote:



On Fri, Oct 4, 2019 at 12:37 PM Chesnay Schepler 
wrote:

*Till: In the FLIP you wrote "The set of partitions to release may 
contain local

and/or global partitions; the promotion set must only refer to local
partitions." to describe the `releasePartitions`. I think the JM 
should

never be in the situation to release a global partition. Moreover, I
believe we should have a separate RPC to release global result 
partitions

which might come from the RM.*

We can certainly add a separate RPC method for explicitly releasing 
global partitions.
You are correct that the JM should not be able to release those, 
just like the RM should not be able to release non-global partitions.
*Till: Once the JM has obtained the required slots to run a job, it 
no longer

needs to communicate with the RM. Hence, a lost RM connection won't
interfere with the job. I would like to keep it like this by 
letting the TE
announce global result partitions to the RM and not to introduce 
another

communication roundtrip.

*Agreed, this is a nice property to retain.
*Till: How big do you expect the payload to become?

*I don't know, which is precisely why I want to be cautious about it.
The last time I made a similar assumption I didn't expect anyone to 
have hundreds of thousands of metrics on a single TM, which turned 
out to be wrong.
I wouldn't exclude the possibility of a similar number of 
partitions being hosted on a single TE.



One problem we have to solve with the heartbeat-based approach is 
that partitions may be lost without the TE noticing, due to 
disk-failures or external delete operations.
Currently, for scheduling purposes we rely on information stored in 
the JM, and update said information if a job fails due to a missing 
partition. However, IIRC the JM is informed about with an exception 
that is thrown by the consumer of said partition, not the producer. 
As far as the producing TM is concerned, it is still hosting that 
partition.
This means we have to forward errors for missing partitions from 
the network stack on the producers side to the TE, so that it can 
inform the RM about it.



Yes, I think you are right Chesnay. This would also be a good 
addition for

the local result partitions.

Cheers,
Till


On 02/10/2019 16:21, Till Rohrmann wrote:

Thanks for addressing our comments Chesnay. See some comments inline.

On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler 
  wrote:



Thank you for your comments; I've aggregated them a bit and added
comments to each of them.

1) Concept name (proposal: persistent)

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-09 Thread Chesnay Schepler

So should we enforce having 2 instances now or defer this to a later date?

I'd rather do this early since it changes 2 assumptions that 
ShuffleMaster can currently make:

- every partition release is preceded by a registration of said partition
- the release of partitions may rely on local data

On 04/10/2019 17:10, Till Rohrmann wrote:

Thanks for updating the FLIP.

I think the RM does not need to have access to a full fledged ShuffleMaster
implementation. Instead it should enough to give it a leaner interface
which only supports to delete result partitions and list available global
partitions. This might entail that one will have a ShuffleMaster
implementation running on the Dispatcher and a
GlobalResultPartitionsShuffleMaster implementation running on the RM. Long
story short, if we separate the RM from the Dispatcher, then this might
entail that we will have two ShuffleMaster incarnations running in each
process.

Cheers,
Till

On Fri, Oct 4, 2019 at 3:34 PM Chesnay Schepler  wrote:


I have updated the FLIP.

- consistently use "local"/"global" terminology; this incidentally should
make it easier to update the terminology if we decide on other names
- inform RM via heartbeats from TE about available global partitions
- add dedicated method for releasing global partitions
- add dedicated section for required changes to the ShuffleMaster (mostly
clarification)
- added some items to the "Rejected Alternatives" section
- updated discussion link


While writing the ShuffleMaster section I noticed the following:

If, at any point, the JM/RM are moved into dedicated processes we either
a) have multiple ShuffleMaster instances for the same shuffle service
active
b) require a single ShuffleMaster on the RM, to which JM calls are being
forwarded.

Neither of these are without pain-points;
a) introduces additional constraints on ShuffleMaster implementations in
that no local state must be kept
b) again forces the JM to regularly be in touch with the RM, and limits
the ShuffleMaster interface to being RPC-friendly.

I'm wondering whether this issue was already an anyone's radar.


On 04/10/2019 14:12, Till Rohrmann wrote:



On Fri, Oct 4, 2019 at 12:37 PM Chesnay Schepler 
wrote:


*Till: In the FLIP you wrote "The set of partitions to release may contain local
and/or global partitions; the promotion set must only refer to local
partitions." to describe the `releasePartitions`. I think the JM should
never be in the situation to release a global partition. Moreover, I
believe we should have a separate RPC to release global result partitions
which might come from the RM.*

We can certainly add a separate RPC method for explicitly releasing global 
partitions.
You are correct that the JM should not be able to release those, just like the 
RM should not be able to release non-global partitions.
*Till: Once the JM has obtained the required slots to run a job, it no longer
needs to communicate with the RM. Hence, a lost RM connection won't
interfere with the job. I would like to keep it like this by letting the TE
announce global result partitions to the RM and not to introduce another
communication roundtrip.

*Agreed, this is a nice property to retain.
*Till: How big do you expect the payload to become?

*I don't know, which is precisely why I want to be cautious about it.
The last time I made a similar assumption I didn't expect anyone to have 
hundreds of thousands of metrics on a single TM, which turned out to be wrong.
I wouldn't exclude the possibility of a similar number of partitions being 
hosted on a single TE.


One problem we have to solve with the heartbeat-based approach is that 
partitions may be lost without the TE noticing, due to disk-failures or 
external delete operations.
Currently, for scheduling purposes we rely on information stored in the JM, and 
update said information if a job fails due to a missing partition. However, 
IIRC the JM is informed about with an exception that is thrown by the consumer 
of said partition, not the producer. As far as the producing TM is concerned, 
it is still hosting that partition.
This means we have to forward errors for missing partitions from the network 
stack on the producers side to the TE, so that it can inform the RM about it.



Yes, I think you are right Chesnay. This would also be a good addition for
the local result partitions.

Cheers,
Till


On 02/10/2019 16:21, Till Rohrmann wrote:

Thanks for addressing our comments Chesnay. See some comments inline.

On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler  
 wrote:


Thank you for your comments; I've aggregated them a bit and added
comments to each of them.

1) Concept name (proposal: persistent)

I agree that "global" is rather undescriptive, particularly so since we
never had a notion of "local" partitions.
I'm not a fan of "persistent"; as to me this always implies reliable
long-term storage which as I understand we aren't shooting for here.

I was thinking of "cached" partitions.

To 

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-04 Thread Till Rohrmann
Thanks for updating the FLIP.

I think the RM does not need to have access to a full fledged ShuffleMaster
implementation. Instead it should enough to give it a leaner interface
which only supports to delete result partitions and list available global
partitions. This might entail that one will have a ShuffleMaster
implementation running on the Dispatcher and a
GlobalResultPartitionsShuffleMaster implementation running on the RM. Long
story short, if we separate the RM from the Dispatcher, then this might
entail that we will have two ShuffleMaster incarnations running in each
process.

Cheers,
Till

On Fri, Oct 4, 2019 at 3:34 PM Chesnay Schepler  wrote:

> I have updated the FLIP.
>
> - consistently use "local"/"global" terminology; this incidentally should
> make it easier to update the terminology if we decide on other names
> - inform RM via heartbeats from TE about available global partitions
> - add dedicated method for releasing global partitions
> - add dedicated section for required changes to the ShuffleMaster (mostly
> clarification)
> - added some items to the "Rejected Alternatives" section
> - updated discussion link
>
>
> While writing the ShuffleMaster section I noticed the following:
>
> If, at any point, the JM/RM are moved into dedicated processes we either
> a) have multiple ShuffleMaster instances for the same shuffle service
> active
> b) require a single ShuffleMaster on the RM, to which JM calls are being
> forwarded.
>
> Neither of these are without pain-points;
> a) introduces additional constraints on ShuffleMaster implementations in
> that no local state must be kept
> b) again forces the JM to regularly be in touch with the RM, and limits
> the ShuffleMaster interface to being RPC-friendly.
>
> I'm wondering whether this issue was already an anyone's radar.
>
>
> On 04/10/2019 14:12, Till Rohrmann wrote:
>
>
>
> On Fri, Oct 4, 2019 at 12:37 PM Chesnay Schepler 
> wrote:
>
>> *Till: In the FLIP you wrote "The set of partitions to release may contain 
>> local
>> and/or global partitions; the promotion set must only refer to local
>> partitions." to describe the `releasePartitions`. I think the JM should
>> never be in the situation to release a global partition. Moreover, I
>> believe we should have a separate RPC to release global result partitions
>> which might come from the RM.*
>>
>> We can certainly add a separate RPC method for explicitly releasing global 
>> partitions.
>> You are correct that the JM should not be able to release those, just like 
>> the RM should not be able to release non-global partitions.
>> *Till: Once the JM has obtained the required slots to run a job, it no longer
>> needs to communicate with the RM. Hence, a lost RM connection won't
>> interfere with the job. I would like to keep it like this by letting the TE
>> announce global result partitions to the RM and not to introduce another
>> communication roundtrip.
>>
>> *Agreed, this is a nice property to retain.
>> *Till: How big do you expect the payload to become?
>>
>> *I don't know, which is precisely why I want to be cautious about it.
>> The last time I made a similar assumption I didn't expect anyone to have 
>> hundreds of thousands of metrics on a single TM, which turned out to be 
>> wrong.
>> I wouldn't exclude the possibility of a similar number of partitions being 
>> hosted on a single TE.
>>
>>
>> One problem we have to solve with the heartbeat-based approach is that 
>> partitions may be lost without the TE noticing, due to disk-failures or 
>> external delete operations.
>> Currently, for scheduling purposes we rely on information stored in the JM, 
>> and update said information if a job fails due to a missing partition. 
>> However, IIRC the JM is informed about with an exception that is thrown by 
>> the consumer of said partition, not the producer. As far as the producing TM 
>> is concerned, it is still hosting that partition.
>> This means we have to forward errors for missing partitions from the network 
>> stack on the producers side to the TE, so that it can inform the RM about it.
>>
>>
> Yes, I think you are right Chesnay. This would also be a good addition for
> the local result partitions.
>
> Cheers,
> Till
>
>> On 02/10/2019 16:21, Till Rohrmann wrote:
>>
>> Thanks for addressing our comments Chesnay. See some comments inline.
>>
>> On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler  
>>  wrote:
>>
>>
>> Thank you for your comments; I've aggregated them a bit and added
>> comments to each of them.
>>
>> 1) Concept name (proposal: persistent)
>>
>> I agree that "global" is rather undescriptive, particularly so since we
>> never had a notion of "local" partitions.
>> I'm not a fan of "persistent"; as to me this always implies reliable
>> long-term storage which as I understand we aren't shooting for here.
>>
>> I was thinking of "cached" partitions.
>>
>> To Zhijiangs point, we should of course make the naming consistent
>> everywhere.
>>
>> 2) Naming of last parameter 

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-04 Thread Chesnay Schepler

I have updated the FLIP.

- consistently use "local"/"global" terminology; this incidentally 
should make it easier to update the terminology if we decide on other names

- inform RM via heartbeats from TE about available global partitions
- add dedicated method for releasing global partitions
- add dedicated section for required changes to the ShuffleMaster 
(mostly clarification)

- added some items to the "Rejected Alternatives" section
- updated discussion link


While writing the ShuffleMaster section I noticed the following:

If, at any point, the JM/RM are moved into dedicated processes we either
a) have multiple ShuffleMaster instances for the same shuffle service active
b) require a single ShuffleMaster on the RM, to which JM calls are being 
forwarded.


Neither of these are without pain-points;
a) introduces additional constraints on ShuffleMaster implementations in 
that no local state must be kept
b) again forces the JM to regularly be in touch with the RM, and limits 
the ShuffleMaster interface to being RPC-friendly.


I'm wondering whether this issue was already an anyone's radar.


On 04/10/2019 14:12, Till Rohrmann wrote:



On Fri, Oct 4, 2019 at 12:37 PM Chesnay Schepler > wrote:


/Till: In the FLIP you wrote "The set of partitions to release may
contain local and/or global partitions; the promotion set must
only refer to local partitions." to describe the
`releasePartitions`. I think the JM should never be in the
situation to release a global partition. Moreover, I believe we
should have a separate RPC to release global result partitions
which might come from the RM./

We can certainly add a separate RPC method for explicitly releasing global 
partitions.
You are correct that the JM should not be able to release those, just like 
the RM should not be able to release non-global partitions.

/Till: Once the JM has obtained the required slots to run a job,
it no longer needs to communicate with the RM. Hence, a lost RM
connection won't interfere with the job. I would like to keep it
like this by letting the TE announce global result partitions to
the RM and not to introduce another communication roundtrip. /Agreed, this 
is a nice property to retain.

/Till: How big do you expect the payload to become? /I don't know, which is 
precisely why I want to be cautious about it.
The last time I made a similar assumption I didn't expect anyone to have 
hundreds of thousands of metrics on a single TM, which turned out to be wrong.
I wouldn't exclude the possibility of a similar number of partitions being 
hosted on a single TE.


One problem we have to solve with the heartbeat-based approach is that 
partitions may be lost without the TE noticing, due to disk-failures or 
external delete operations.
Currently, for scheduling purposes we rely on information stored in the JM, 
and update said information if a job fails due to a missing partition. However, 
IIRC the JM is informed about with an exception that is thrown by the consumer 
of said partition, not the producer. As far as the producing TM is concerned, 
it is still hosting that partition.
This means we have to forward errors for missing partitions from the 
network stack on the producers side to the TE, so that it can inform the RM 
about it.


Yes, I think you are right Chesnay. This would also be a good addition 
for the local result partitions.


Cheers,
Till

On 02/10/2019 16:21, Till Rohrmann wrote:

Thanks for addressing our comments Chesnay. See some comments inline.

On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler  
  wrote:


Thank you for your comments; I've aggregated them a bit and added
comments to each of them.

1) Concept name (proposal: persistent)

I agree that "global" is rather undescriptive, particularly so since we
never had a notion of "local" partitions.
I'm not a fan of "persistent"; as to me this always implies reliable
long-term storage which as I understand we aren't shooting for here.

I was thinking of "cached" partitions.

To Zhijiangs point, we should of course make the naming consistent
everywhere.

2) Naming of last parameter of TE#releasePartitions (proposal:
partitionsToRetain / partitionsToPersistent)

I can see where you're coming from ("promote" is somewhat abstract), but
I think both suggestions have downsides.

"partitionsToPersistent" to me implies an additional write operation to
somewhere, but we aren't doing that.
"partitionsToRetain" kind of results in a redundancy with the other
argument since retaining is the opposite to releasing a partition; if I
want to retain a partition, why am I not just excluding it from the set
to release?

I quite like "promote" personally; we fundamentally change how the
lifecycle for these partitions work, and introducing new keywords 

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-04 Thread Till Rohrmann
On Fri, Oct 4, 2019 at 12:37 PM Chesnay Schepler  wrote:

> *Till: In the FLIP you wrote "The set of partitions to release may contain 
> local
> and/or global partitions; the promotion set must only refer to local
> partitions." to describe the `releasePartitions`. I think the JM should
> never be in the situation to release a global partition. Moreover, I
> believe we should have a separate RPC to release global result partitions
> which might come from the RM.*
>
> We can certainly add a separate RPC method for explicitly releasing global 
> partitions.
> You are correct that the JM should not be able to release those, just like 
> the RM should not be able to release non-global partitions.
> *Till: Once the JM has obtained the required slots to run a job, it no longer
> needs to communicate with the RM. Hence, a lost RM connection won't
> interfere with the job. I would like to keep it like this by letting the TE
> announce global result partitions to the RM and not to introduce another
> communication roundtrip.
>
> *Agreed, this is a nice property to retain.
> *Till: How big do you expect the payload to become?
>
> *I don't know, which is precisely why I want to be cautious about it.
> The last time I made a similar assumption I didn't expect anyone to have 
> hundreds of thousands of metrics on a single TM, which turned out to be wrong.
> I wouldn't exclude the possibility of a similar number of partitions being 
> hosted on a single TE.
>
>
> One problem we have to solve with the heartbeat-based approach is that 
> partitions may be lost without the TE noticing, due to disk-failures or 
> external delete operations.
> Currently, for scheduling purposes we rely on information stored in the JM, 
> and update said information if a job fails due to a missing partition. 
> However, IIRC the JM is informed about with an exception that is thrown by 
> the consumer of said partition, not the producer. As far as the producing TM 
> is concerned, it is still hosting that partition.
> This means we have to forward errors for missing partitions from the network 
> stack on the producers side to the TE, so that it can inform the RM about it.
>
>
Yes, I think you are right Chesnay. This would also be a good addition for
the local result partitions.

Cheers,
Till

>  On 02/10/2019 16:21, Till Rohrmann wrote:
>
> Thanks for addressing our comments Chesnay. See some comments inline.
>
> On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler  
>  wrote:
>
>
> Thank you for your comments; I've aggregated them a bit and added
> comments to each of them.
>
> 1) Concept name (proposal: persistent)
>
> I agree that "global" is rather undescriptive, particularly so since we
> never had a notion of "local" partitions.
> I'm not a fan of "persistent"; as to me this always implies reliable
> long-term storage which as I understand we aren't shooting for here.
>
> I was thinking of "cached" partitions.
>
> To Zhijiangs point, we should of course make the naming consistent
> everywhere.
>
> 2) Naming of last parameter of TE#releasePartitions (proposal:
> partitionsToRetain / partitionsToPersistent)
>
> I can see where you're coming from ("promote" is somewhat abstract), but
> I think both suggestions have downsides.
>
> "partitionsToPersistent" to me implies an additional write operation to
> somewhere, but we aren't doing that.
> "partitionsToRetain" kind of results in a redundancy with the other
> argument since retaining is the opposite to releasing a partition; if I
> want to retain a partition, why am I not just excluding it from the set
> to release?
>
> I quite like "promote" personally; we fundamentally change how the
> lifecycle for these partitions work, and introducing new keywords isn't
> a inherently a bad choice.
>
> 3) Naming of TE#releasePartitions (proposal: releaseOrPromotePartitions;
> Note: addition of "OrPromote" is dependent on 2) )
>
> Good point.
>
> 4) /Till: I'm not sure whether partitionsToRelease should contain a//
> //global/persistent result partition id. I always thought that the user
> will//
> //be responsible for managing the lifecycle of a global/persistent//
> //result partition./
>
> @Till Please elaborate; which method/argument are you referring to?
>
>
> In the FLIP you wrote "The set of partitions to release may contain local
> and/or global partitions; the promotion set must only refer to local
> partitions." to describe the `releasePartitions`. I think the JM should
> never be in the situation to release a global partition. Moreover, I
> believe we should have a separate RPC to release global result partitions
> which might come from the RM.
>
>
> 4)/Dedicated PartitionTable for global partitions/
>
> Since there is only one RM for each TE a PartitionTable is unnecessary;
> a simple set will suffice.
> Alternatively, we could introduce such a dedicated set into the
> PartitionTable to keep these data-structures close.
>
> 5) /Zhijiang: Nit: TM->TE in the section of Proposed Changes: "TMs
> 

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-04 Thread Chesnay Schepler
/Till: In the FLIP you wrote "The set of partitions to release may 
contain local and/or global partitions; the promotion set must only 
refer to local partitions." to describe the `releasePartitions`. I think 
the JM should never be in the situation to release a global partition. 
Moreover, I believe we should have a separate RPC to release global 
result partitions which might come from the RM./


We can certainly add a separate RPC method for explicitly releasing global 
partitions.
You are correct that the JM should not be able to release those, just like the 
RM should not be able to release non-global partitions.

/Till: Once the JM has obtained the required slots to run a job, it no 
longer needs to communicate with the RM. Hence, a lost RM connection 
won't interfere with the job. I would like to keep it like this by 
letting the TE announce global result partitions to the RM and not to 
introduce another communication roundtrip. /Agreed, this is a nice property to retain.


/Till: How big do you expect the payload to become? /I don't know, which is 
precisely why I want to be cautious about it.
The last time I made a similar assumption I didn't expect anyone to have 
hundreds of thousands of metrics on a single TM, which turned out to be wrong.
I wouldn't exclude the possibility of a similar number of partitions being 
hosted on a single TE.


One problem we have to solve with the heartbeat-based approach is that 
partitions may be lost without the TE noticing, due to disk-failures or 
external delete operations.
Currently, for scheduling purposes we rely on information stored in the JM, and 
update said information if a job fails due to a missing partition. However, 
IIRC the JM is informed about with an exception that is thrown by the consumer 
of said partition, not the producer. As far as the producing TM is concerned, 
it is still hosting that partition.
This means we have to forward errors for missing partitions from the network 
stack on the producers side to the TE, so that it can inform the RM about it.

//

On 02/10/2019 16:21, Till Rohrmann wrote:

Thanks for addressing our comments Chesnay. See some comments inline.

On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler  wrote:


Thank you for your comments; I've aggregated them a bit and added
comments to each of them.

1) Concept name (proposal: persistent)

I agree that "global" is rather undescriptive, particularly so since we
never had a notion of "local" partitions.
I'm not a fan of "persistent"; as to me this always implies reliable
long-term storage which as I understand we aren't shooting for here.

I was thinking of "cached" partitions.

To Zhijiangs point, we should of course make the naming consistent
everywhere.

2) Naming of last parameter of TE#releasePartitions (proposal:
partitionsToRetain / partitionsToPersistent)

I can see where you're coming from ("promote" is somewhat abstract), but
I think both suggestions have downsides.

"partitionsToPersistent" to me implies an additional write operation to
somewhere, but we aren't doing that.
"partitionsToRetain" kind of results in a redundancy with the other
argument since retaining is the opposite to releasing a partition; if I
want to retain a partition, why am I not just excluding it from the set
to release?

I quite like "promote" personally; we fundamentally change how the
lifecycle for these partitions work, and introducing new keywords isn't
a inherently a bad choice.

3) Naming of TE#releasePartitions (proposal: releaseOrPromotePartitions;
Note: addition of "OrPromote" is dependent on 2) )

Good point.

4) /Till: I'm not sure whether partitionsToRelease should contain a//
//global/persistent result partition id. I always thought that the user
will//
//be responsible for managing the lifecycle of a global/persistent//
//result partition./

@Till Please elaborate; which method/argument are you referring to?


In the FLIP you wrote "The set of partitions to release may contain local
and/or global partitions; the promotion set must only refer to local
partitions." to describe the `releasePartitions`. I think the JM should
never be in the situation to release a global partition. Moreover, I
believe we should have a separate RPC to release global result partitions
which might come from the RM.


4)/Dedicated PartitionTable for global partitions/

Since there is only one RM for each TE a PartitionTable is unnecessary;
a simple set will suffice.
Alternatively, we could introduce such a dedicated set into the
PartitionTable to keep these data-structures close.

5) /Zhijiang: Nit: TM->TE in the section of Proposed Changes: "TMs
retain global partitions for successful jobs"/

Will fix it.

6) /Zhijiang: Considering ShuffleMaster, it was built inside JM and
expected to interactive with JM before. Now the RM also needs to
interactive with ShuffleMaster to release global partitions. Then it
might be better to move ShuffleMaster outside of JM, and the lifecycle
of ShuffleMaster should 

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-02 Thread Till Rohrmann
Thanks for addressing our comments Chesnay. See some comments inline.

On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler  wrote:

> Thank you for your comments; I've aggregated them a bit and added
> comments to each of them.
>
> 1) Concept name (proposal: persistent)
>
> I agree that "global" is rather undescriptive, particularly so since we
> never had a notion of "local" partitions.
> I'm not a fan of "persistent"; as to me this always implies reliable
> long-term storage which as I understand we aren't shooting for here.
>
> I was thinking of "cached" partitions.
>
> To Zhijiangs point, we should of course make the naming consistent
> everywhere.
>
> 2) Naming of last parameter of TE#releasePartitions (proposal:
> partitionsToRetain / partitionsToPersistent)
>
> I can see where you're coming from ("promote" is somewhat abstract), but
> I think both suggestions have downsides.
>
> "partitionsToPersistent" to me implies an additional write operation to
> somewhere, but we aren't doing that.
> "partitionsToRetain" kind of results in a redundancy with the other
> argument since retaining is the opposite to releasing a partition; if I
> want to retain a partition, why am I not just excluding it from the set
> to release?
>
> I quite like "promote" personally; we fundamentally change how the
> lifecycle for these partitions work, and introducing new keywords isn't
> a inherently a bad choice.
>
> 3) Naming of TE#releasePartitions (proposal: releaseOrPromotePartitions;
> Note: addition of "OrPromote" is dependent on 2) )
>
> Good point.
>
> 4) /Till: I'm not sure whether partitionsToRelease should contain a//
> //global/persistent result partition id. I always thought that the user
> will//
> //be responsible for managing the lifecycle of a global/persistent//
> //result partition./
>
> @Till Please elaborate; which method/argument are you referring to?
>

In the FLIP you wrote "The set of partitions to release may contain local
and/or global partitions; the promotion set must only refer to local
partitions." to describe the `releasePartitions`. I think the JM should
never be in the situation to release a global partition. Moreover, I
believe we should have a separate RPC to release global result partitions
which might come from the RM.

>
> 4)/Dedicated PartitionTable for global partitions/
>
> Since there is only one RM for each TE a PartitionTable is unnecessary;
> a simple set will suffice.
> Alternatively, we could introduce such a dedicated set into the
> PartitionTable to keep these data-structures close.
>
> 5) /Zhijiang: Nit: TM->TE in the section of Proposed Changes: "TMs
> retain global partitions for successful jobs"/
>
> Will fix it.
>
> 6) /Zhijiang: Considering ShuffleMaster, it was built inside JM and
> expected to interactive with JM before. Now the RM also needs to
> interactive with ShuffleMaster to release global partitions. Then it
> might be better to move ShuffleMaster outside of JM, and the lifecycle
> of ShuffleMaster should be consistent with RM./
>
> Yes, I alluded to this in the FLIP but should've been more explicit; the
> shuffle master must outlive the JM. This is somewhat tricky when
> considering the future a bit; if we assume that different jobs or even a
> single one can use different shuffle services, then we need a way to
> associate the partitions with the corresponding shuffle master. This
> will likely require the introduction of a ShuffleMasterID that is
> included in the ShuffleDescriptor.
>
> 7) Handover
>
> /Till: The handover logic between the JM and the RM for the
> global/persistent//
> //result partitions seems a bit brittle to me. What will happen if the JM//
> //cannot reach the RM? I think it would be better if the TM announces the//
> //global/persistent result partitions to the RM via its heartbeats. That
> way//
> //we don't rely on an established connection between the JM and RM and we//
> //keep the TM as the ground of truth. Moreover, the RM should simply
> forward//
> //the release calls to the TM without much internal logic./
>
> As for your question, if the JM cannot reach the RM the handover will
> fail, the JM will likely shutdown without promoting any partition and
> the TE will release all partitions.
> What is the defined behavior for the JM in case of the RM disconnect
> after a job has finished? Does it always/sometimes/never shutdown
> with/-out communicating the result to the client / updating HA data;
> or simply put, does the JM behave to the user as if nothing has happened
> in all cases?
>

Once the JM has obtained the required slots to run a job, it no longer
needs to communicate with the RM. Hence, a lost RM connection won't
interfere with the job. I would like to keep it like this by letting the TE
announce global result partitions to the RM and not to introduce another
communication roundtrip.

>
> A heartbeat-based approach is useful and can alleviate some failure
> cases (see below); but we need to make sure we don't exceed the akka

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-10-02 Thread Chesnay Schepler
Thank you for your comments; I've aggregated them a bit and added 
comments to each of them.


1) Concept name (proposal: persistent)

I agree that "global" is rather undescriptive, particularly so since we 
never had a notion of "local" partitions.
I'm not a fan of "persistent"; as to me this always implies reliable 
long-term storage which as I understand we aren't shooting for here.


I was thinking of "cached" partitions.

To Zhijiangs point, we should of course make the naming consistent 
everywhere.


2) Naming of last parameter of TE#releasePartitions (proposal: 
partitionsToRetain / partitionsToPersistent)


I can see where you're coming from ("promote" is somewhat abstract), but 
I think both suggestions have downsides.


"partitionsToPersistent" to me implies an additional write operation to 
somewhere, but we aren't doing that.
"partitionsToRetain" kind of results in a redundancy with the other 
argument since retaining is the opposite to releasing a partition; if I 
want to retain a partition, why am I not just excluding it from the set 
to release?


I quite like "promote" personally; we fundamentally change how the 
lifecycle for these partitions work, and introducing new keywords isn't 
a inherently a bad choice.


3) Naming of TE#releasePartitions (proposal: releaseOrPromotePartitions; 
Note: addition of "OrPromote" is dependent on 2) )


Good point.

4) /Till: I'm not sure whether partitionsToRelease should contain a//
//global/persistent result partition id. I always thought that the user 
will//

//be responsible for managing the lifecycle of a global/persistent//
//result partition./

@Till Please elaborate; which method/argument are you referring to?

4)/Dedicated PartitionTable for global partitions/

Since there is only one RM for each TE a PartitionTable is unnecessary; 
a simple set will suffice.
Alternatively, we could introduce such a dedicated set into the 
PartitionTable to keep these data-structures close.


5) /Zhijiang: Nit: TM->TE in the section of Proposed Changes: "TMs 
retain global partitions for successful jobs"/


Will fix it.

6) /Zhijiang: Considering ShuffleMaster, it was built inside JM and 
expected to interactive with JM before. Now the RM also needs to 
interactive with ShuffleMaster to release global partitions. Then it 
might be better to move ShuffleMaster outside of JM, and the lifecycle 
of ShuffleMaster should be consistent with RM./


Yes, I alluded to this in the FLIP but should've been more explicit; the 
shuffle master must outlive the JM. This is somewhat tricky when 
considering the future a bit; if we assume that different jobs or even a 
single one can use different shuffle services, then we need a way to 
associate the partitions with the corresponding shuffle master. This 
will likely require the introduction of a ShuffleMasterID that is 
included in the ShuffleDescriptor.


7) Handover

/Till: The handover logic between the JM and the RM for the 
global/persistent//

//result partitions seems a bit brittle to me. What will happen if the JM//
//cannot reach the RM? I think it would be better if the TM announces the//
//global/persistent result partitions to the RM via its heartbeats. That 
way//

//we don't rely on an established connection between the JM and RM and we//
//keep the TM as the ground of truth. Moreover, the RM should simply 
forward//

//the release calls to the TM without much internal logic./

As for your question, if the JM cannot reach the RM the handover will 
fail, the JM will likely shutdown without promoting any partition and 
the TE will release all partitions.
What is the defined behavior for the JM in case of the RM disconnect 
after a job has finished? Does it always/sometimes/never shutdown 
with/-out communicating the result to the client / updating HA data;
or simply put, does the JM behave to the user as if nothing has happened 
in all cases?


A heartbeat-based approach is useful and can alleviate some failure 
cases (see below); but we need to make sure we don't exceed the akka 
framesize or otherwise interfere with the heartbeat mechanism (like we 
did with metrics in the past). Ideally we would only submit updates to 
the partition set (added/removed partitions), but I'm not sure if the 
heartbeats are reliable enough for this to work.


8. Failure cases:
/Becket:/
/a) The TEs may remove the result partition while the RM does not//
//know. In this case, the client will receive a runtime error and submit 
the//
//full DAG to recompute the missing result partition. In this case, RM 
should//

//release the incomplete global partition. How would RM be notified to do//
//that?//
//b) Is it possible the RM looses global partition metadata while//
//the TE still host the data? For example, RM deletes the global partition//
//entry while the release partition call to TE failed.//
//c) What would happen if the JM fails before the global partitions//
//are registered to RM? Are users exposed to resource leak if JM does 

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-09-30 Thread Becket Qin
Forgot to say that I agree with Till that it seems a good idea to let TEs
register the global partitions to the RM instead of letting JM do it. This
simplifies quite a few things.

Thanks,

Jiangjie (Becket) Qin

On Sun, Sep 29, 2019 at 11:25 PM Becket Qin  wrote:

> Hi Chesnay,
>
> Thanks for the proposal. My understanding of the entire workflow step by
> step is following:
>
>- JM maintains the local and global partition metadata when the task
> runs to create result partitions. The tasks themselves does not distinguish
> between local / global partitions. Only the JM knows that.
>- JM releases the local partitions as the job executes. When a job
> finishes successfully, JM registers the global partitions to the RM. The
> global partition IDs are set on the client instead of randomly generated,
> so the client can release global partitions using them. (It would be good
> to have some human readable string associated with the global result
> partitions).
>- Client issues REST call to list / release global partitions.
>
> A few thoughts / questions below:
> 1. Failure cases:
>   * The TEs may remove the result partition while the RM does not
> know. In this case, the client will receive a runtime error and submit the
> full DAG to recompute the missing result partition. In this case, RM should
> release the incomplete global partition. How would RM be notified to do
> that?
>   * Is it possible the RM looses global partition metadata while
> the TE still host the data? For example, RM deletes the global partition
> entry while the release partition call to TE failed.
>   * What would happen if the JM fails before the global partitions
> are registered to RM? Are users exposed to resource leak if JM does not
> have HA?
>   * What would happen if the RM fails? Will TE release the
> partitions by themselves?
>
> 2. It looks that TE should be the source of truth of the result partition
> existence. Does it have to distinguish between global and local result
> partitions? If TE does not need to distinguish them, it seems the the
> releasePartition() method in TE could just provide the list of partitions
> to release, without the partitions to promote.
>
> 3. In the current design, RM should be able to release result
> partitions using ShuffleService. Will RM do this by sending RPC to the TEs?
> Or will the RM do it by itself?
>
> 4. How do we plan to handle the case when there are different shuffle
> services in the same Flink cluster? For example, a shared standalone
> cluster.
>
> 5. Minor: usually REST API uses `?` to pass the parameters. Is there a
> reason we use `:` instead?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Sep 17, 2019 at 3:22 AM zhijiang
>  wrote:
>
>>
>> Thanks Chesnay for this FLIP and sorry for touching it a bit delay on my
>> side.
>>
>> I also have some similar concerns which Till already proposed before.
>>
>> 1. The consistent terminology in different components. On JM side,
>> PartitionTracker#getPersistedBlockingPartitions is defined for getting
>> global partitions. And on RM side, we define the method of
>> #registerGlobalPartitions correspondingly for handover the partitions from
>> JM. I think it is better to unify the term in different components for for
>> better understanding the semantic. Concering whether to use global or
>> persistent, I prefer the "global" term personally. Because it describes the
>> scope of partition clearly, and the "persistent" is more like the partition
>> storing way or implementation detail. In other words, the global partition
>> might also be cached in memory of TE, not must persist into files from
>> semantic requirements. Whether memory or persistent file is just the
>> implementation choice.
>>
>> 2. On TE side, we might rename the method #releasePartitions to
>> #releaseOrPromotePartitions which describes the function precisely and
>> keeps consistent with
>> PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor().
>>
>> 3. Very agree with Till's suggestions of global PartitionTable on TE side
>> and sticking to TE's heartbeat report to RM for global partitions.
>>
>> 4. Considering ShuffleMaster, it was built inside JM and expected to
>> interactive with JM before. Now the RM also needs to interactive with
>> ShuffleMaster to release global partitions. Then it might be better to move
>> ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should be
>> consistent with RM.
>>
>> 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global
>> partitions for successful jobs"
>>
>> Best,
>> Zhijiang
>>
>>
>> --
>> From:Till Rohrmann 
>> Send Time:2019年9月10日(星期二) 10:10
>> To:dev 
>> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
>>
>> Thanks Chesnay for drafting the FLIP and starting this discussion.
>>
>> I have a couple of comments:
>>
>> * I know that I've also coined the terms global/local 

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-09-30 Thread Becket Qin
Hi Chesnay,

Thanks for the proposal. My understanding of the entire workflow step by
step is following:

   - JM maintains the local and global partition metadata when the task
runs to create result partitions. The tasks themselves does not distinguish
between local / global partitions. Only the JM knows that.
   - JM releases the local partitions as the job executes. When a job
finishes successfully, JM registers the global partitions to the RM. The
global partition IDs are set on the client instead of randomly generated,
so the client can release global partitions using them. (It would be good
to have some human readable string associated with the global result
partitions).
   - Client issues REST call to list / release global partitions.

A few thoughts / questions below:
1. Failure cases:
  * The TEs may remove the result partition while the RM does not
know. In this case, the client will receive a runtime error and submit the
full DAG to recompute the missing result partition. In this case, RM should
release the incomplete global partition. How would RM be notified to do
that?
  * Is it possible the RM looses global partition metadata while
the TE still host the data? For example, RM deletes the global partition
entry while the release partition call to TE failed.
  * What would happen if the JM fails before the global partitions
are registered to RM? Are users exposed to resource leak if JM does not
have HA?
  * What would happen if the RM fails? Will TE release the
partitions by themselves?

2. It looks that TE should be the source of truth of the result partition
existence. Does it have to distinguish between global and local result
partitions? If TE does not need to distinguish them, it seems the the
releasePartition() method in TE could just provide the list of partitions
to release, without the partitions to promote.

3. In the current design, RM should be able to release result
partitions using ShuffleService. Will RM do this by sending RPC to the TEs?
Or will the RM do it by itself?

4. How do we plan to handle the case when there are different shuffle
services in the same Flink cluster? For example, a shared standalone
cluster.

5. Minor: usually REST API uses `?` to pass the parameters. Is there a
reason we use `:` instead?

Thanks,

Jiangjie (Becket) Qin

On Tue, Sep 17, 2019 at 3:22 AM zhijiang 
wrote:

>
> Thanks Chesnay for this FLIP and sorry for touching it a bit delay on my
> side.
>
> I also have some similar concerns which Till already proposed before.
>
> 1. The consistent terminology in different components. On JM side,
> PartitionTracker#getPersistedBlockingPartitions is defined for getting
> global partitions. And on RM side, we define the method of
> #registerGlobalPartitions correspondingly for handover the partitions from
> JM. I think it is better to unify the term in different components for for
> better understanding the semantic. Concering whether to use global or
> persistent, I prefer the "global" term personally. Because it describes the
> scope of partition clearly, and the "persistent" is more like the partition
> storing way or implementation detail. In other words, the global partition
> might also be cached in memory of TE, not must persist into files from
> semantic requirements. Whether memory or persistent file is just the
> implementation choice.
>
> 2. On TE side, we might rename the method #releasePartitions to
> #releaseOrPromotePartitions which describes the function precisely and
> keeps consistent with
> PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor().
>
> 3. Very agree with Till's suggestions of global PartitionTable on TE side
> and sticking to TE's heartbeat report to RM for global partitions.
>
> 4. Considering ShuffleMaster, it was built inside JM and expected to
> interactive with JM before. Now the RM also needs to interactive with
> ShuffleMaster to release global partitions. Then it might be better to move
> ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should be
> consistent with RM.
>
> 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global
> partitions for successful jobs"
>
> Best,
> Zhijiang
>
>
> --
> From:Till Rohrmann 
> Send Time:2019年9月10日(星期二) 10:10
> To:dev 
> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
>
> Thanks Chesnay for drafting the FLIP and starting this discussion.
>
> I have a couple of comments:
>
> * I know that I've also coined the terms global/local result partition but
> maybe it is not the perfect name. Maybe we could rethink the terminology
> and call them persistent result partitions?
> * Nit: I would call the last parameter of void releasePartitions(JobID
> jobId, Collection partitionsToRelease,
> Collection partitionsToPromote) either
> partitionsToRetain or partitionsToPersistent.
> * I'm not sure whether partitionsToRelease should contain a
> 

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-09-17 Thread zhijiang

Thanks Chesnay for this FLIP and sorry for touching it a bit delay on my side.

I also have some similar concerns which Till already proposed before.

1. The consistent terminology in different components. On JM side, 
PartitionTracker#getPersistedBlockingPartitions is defined for getting global 
partitions. And on RM side, we define the method of #registerGlobalPartitions 
correspondingly for handover the partitions from JM. I think it is better to 
unify the term in different components for for better understanding the 
semantic. Concering whether to use global or persistent, I prefer the "global" 
term personally. Because it describes the scope of partition clearly, and the 
"persistent" is more like the partition storing way or implementation detail. 
In other words, the global partition might also be cached in memory of TE, not 
must persist into files from semantic requirements. Whether memory or 
persistent file is just the implementation choice.

2. On TE side, we might rename the method #releasePartitions to 
#releaseOrPromotePartitions which describes the function precisely and keeps 
consistent with PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor().

3. Very agree with Till's suggestions of global PartitionTable on TE side and 
sticking to TE's heartbeat report to RM for global partitions.

4. Considering ShuffleMaster, it was built inside JM and expected to 
interactive with JM before. Now the RM also needs to interactive with 
ShuffleMaster to release global partitions. Then it might be better to move 
ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should be 
consistent with RM.

5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global 
partitions for successful jobs"

Best,
Zhijiang


--
From:Till Rohrmann 
Send Time:2019年9月10日(星期二) 10:10
To:dev 
Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle

Thanks Chesnay for drafting the FLIP and starting this discussion.

I have a couple of comments:

* I know that I've also coined the terms global/local result partition but
maybe it is not the perfect name. Maybe we could rethink the terminology
and call them persistent result partitions?
* Nit: I would call the last parameter of void releasePartitions(JobID
jobId, Collection partitionsToRelease,
Collection partitionsToPromote) either
partitionsToRetain or partitionsToPersistent.
* I'm not sure whether partitionsToRelease should contain a
global/persistent result partition id. I always thought that the user will
be responsible for managing the lifecycle of a global/persistent
result partition.
* Instead of extending the PartitionTable to be able to store
global/persistent and local/transient result partitions, I would rather
introduce a global PartitionTable to store the global/persistent result
partitions explicitly. I think there is a benefit in making things as
explicit as possible.
* The handover logic between the JM and the RM for the global/persistent
result partitions seems a bit brittle to me. What will happen if the JM
cannot reach the RM? I think it would be better if the TM announces the
global/persistent result partitions to the RM via its heartbeats. That way
we don't rely on an established connection between the JM and RM and we
keep the TM as the ground of truth. Moreover, the RM should simply forward
the release calls to the TM without much internal logic.

Cheers,
Till

On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler  wrote:

> Hello,
>
> FLIP-36 (interactive programming)
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink>
>
> proposes a new programming paradigm where jobs are built incrementally
> by the user.
>
> To support this in an efficient manner I propose to extend partition
> life-cycle to support the notion of /global partitions/, which are
> partitions that can exist beyond the life-time of a job.
>
> These partitions could then be re-used by subsequent jobs in a fairly
> efficient manner, as they don't have to persisted to an external storage
> first and consuming tasks could be scheduled to exploit data-locality.
>
> The FLIP outlines the required changes on the JobMaster, TaskExecutor
> and ResourceManager to support this from a life-cycle perspective.
>
> This FLIP does /not/ concern itself with the /usage/ of global
> partitions, including client-side APIs, job-submission, scheduling and
> reading said partitions; these are all follow-ups that will either be
> part of FLIP-36 or spliced out into separate FLIPs.
>
>



Re: [DISCUSS] FLIP-67: Global partitions lifecycle

2019-09-10 Thread Till Rohrmann
Thanks Chesnay for drafting the FLIP and starting this discussion.

I have a couple of comments:

* I know that I've also coined the terms global/local result partition but
maybe it is not the perfect name. Maybe we could rethink the terminology
and call them persistent result partitions?
* Nit: I would call the last parameter of void releasePartitions(JobID
jobId, Collection partitionsToRelease,
Collection partitionsToPromote) either
partitionsToRetain or partitionsToPersistent.
* I'm not sure whether partitionsToRelease should contain a
global/persistent result partition id. I always thought that the user will
be responsible for managing the lifecycle of a global/persistent
result partition.
* Instead of extending the PartitionTable to be able to store
global/persistent and local/transient result partitions, I would rather
introduce a global PartitionTable to store the global/persistent result
partitions explicitly. I think there is a benefit in making things as
explicit as possible.
* The handover logic between the JM and the RM for the global/persistent
result partitions seems a bit brittle to me. What will happen if the JM
cannot reach the RM? I think it would be better if the TM announces the
global/persistent result partitions to the RM via its heartbeats. That way
we don't rely on an established connection between the JM and RM and we
keep the TM as the ground of truth. Moreover, the RM should simply forward
the release calls to the TM without much internal logic.

Cheers,
Till

On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler  wrote:

> Hello,
>
> FLIP-36 (interactive programming)
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink>
>
> proposes a new programming paradigm where jobs are built incrementally
> by the user.
>
> To support this in an efficient manner I propose to extend partition
> life-cycle to support the notion of /global partitions/, which are
> partitions that can exist beyond the life-time of a job.
>
> These partitions could then be re-used by subsequent jobs in a fairly
> efficient manner, as they don't have to persisted to an external storage
> first and consuming tasks could be scheduled to exploit data-locality.
>
> The FLIP outlines the required changes on the JobMaster, TaskExecutor
> and ResourceManager to support this from a life-cycle perspective.
>
> This FLIP does /not/ concern itself with the /usage/ of global
> partitions, including client-side APIs, job-submission, scheduling and
> reading said partitions; these are all follow-ups that will either be
> part of FLIP-36 or spliced out into separate FLIPs.
>
>