Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-23 Thread Yi Pan
@Dong, thanks for the updates.

+1

On Thu, Jun 22, 2017 at 3:36 PM, Dong Lin  wrote:

> Hey Yi,
>
> Thanks for the detailed comment and the summary!
>
> To address your comments:
>
> 1) The current names are GroupByPartitionWithFixedTaskNum and
> GroupBySystemStreamPartitionWithFixedTaskNum. Instead of
> FixedTasksGroupByPartition
> and FixedTasksGroupBySystemStreamPartition, how about GroupByPartition
> FixedTasks and GroupBySystemStreamPartitionFixedTasks? The new names are
> equally long as the names you suggested. It seems a bit more intuitive
> because they would be prefixed with the grouper class name of their
> no-fixed-tasks counterpart. I have updated wiki with the new names. Can you
> let me know if it is OK?
>
> 2) Initially I want to design that config and interface later when we have
> more use-case so that we can have higher confidence in the interface
> design. But it seems that one common concern with the proposal is about its
> limitation assumption in the the old-partition-to-new-partition mapping. I
> have updated the wiki to illustrate the design of this interface and the
> new (and more general) assumption for the input system to use this
> partition expansion. Can you take a look and see if it is reasonable?
>
> 3) Yeah previously Jacob has raised the same concern and the solution is
> exactly the same as you suggested.
>
> Hey everyone,
>
> I have made non-trivial change to the wiki to illustrate the use of new
> config and interface for user to specify new-partition-to-old-partition
> mapping. Can you please help review it?
>
> Thanks,
> Dong
>
>
> On Thu, Jun 22, 2017 at 2:25 AM, Yi Pan  wrote:
>
> > Hi, Dong and everyone,
> >
> > Thanks for the detailed discussion on SEP-5! Really appreciate the
> thorough
> > consideration on this issue. I also noticed that Dong has updated the
> SEP-5
> > wiki to clarify:
> > 1) SEP-5 provides a solution to retain the same number of task/state w/o
> > re-partitioning (as illustrated in the stateful join example)
> > 2) Future work to expand number of tasks need to work together with
> > flexible re-partitioning to provide a complete solution
> >
> > Due to the cost to be paid in task number expansion:
> > 1) additional network I/O and latency in re-partitioning
> > 2) shuffling of the states among tasks
> > The current form of SEP-5 provides an alternative when partition
> expansion
> > in the messaging system is not due to increase of total input rate.
> >
> > The concern on the added complexity in grouper logic is valid. However,
> the
> > grouper-based solution is not completely unreasonable:
> > 1) Grouper is a public interface and we are already open to customized
> > implementation of groupers, although not being a main use case
> > 2) Deprecation of existing config-driven grouper needs longer time effort
> > to wait for fluent API has a better planner to automatically figuring out
> > the grouper to be used and stateful task expansion is automated. Hence,
> for
> > a foreseeable long time, grouper is still configured by the user.
> >
> > So, in general, I am in favor of the proposed SEP-5, given that it
> provides
> > a least-resistance to address some pain points for Samza users, w/o
> > breaking any existing use cases in opt-in mode.
> >
> > Some minor suggestions:
> > 1) The class names are too long. Can we change them to
> > FixedTasksGroupByPartition and FixedTasksGroupBySystemStreamPartition?
> > 2) I am still in favor of configurable partition expansion (i.e.
> new<->old
> > partition mapping) policy, since it makes this solution more general and
> > not fixed for Kafka. I am OK with default to power-of-2 expansion policy
> > and not introducing new config variable now.
> > 3) In the checkpoint/coordinator topic validation, KafkaCheckpointLogKey
> > class validates the current grouper factory class == the previous grouper
> > factory class in previous checkpoint. We need to make sure that we allow
> > the compatible change from GroupByPartition to
> FixedTasksGroupByPartition,
> > etc. Since FixedTasksGroupByPartition is a derived interface of
> > GroupByPartition, one possible solution is to check assignable (if
> current
> > grouper factory class is assignable to the previous grouper factory
> class)
> >
> > Thanks a lot!
> >
> > On Wed, Jun 21, 2017 at 5:11 PM, Navina Ramesh (Apache) <
> nav...@apache.org
> > >
> > wrote:
> >
> > > > But IMO it is the best available solution towards the support of
> > > partition expansion in comparison to alternative, no?
> > >
> > > At this time, relative to the other alternatives you have listed, this
> > is a
> > > path of least effort to solving this problem. I agree to that. :)
> > >
> > > > I can merge those two sections or update the statement if the current
> > > statement
> > > has not clearly explained the reason of partition expansion in Kafka.
> > >
> > > Given the significance of what you are actually trying to solve, I
> think
> > it
> > > will be better to have it in points. Let

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-23 Thread Navina Ramesh
Yi,
Thanks for summarizing. I think we should deal with further code related
changes/discussions in the PR directly since this SEP has been open for a
while. Let's try to wrap up the discussions by today.

@Dong: Thanks for updating the SEP. I think the TestPlan section is TBD
right now. You can update it whenever you get to it. Thanks a bunch for
your patience!

Cheers!
Navina

On Thu, Jun 22, 2017 at 3:36 PM, Dong Lin  wrote:

> Hey Yi,
>
> Thanks for the detailed comment and the summary!
>
> To address your comments:
>
> 1) The current names are GroupByPartitionWithFixedTaskNum and
> GroupBySystemStreamPartitionWithFixedTaskNum. Instead of
> FixedTasksGroupByPartition
> and FixedTasksGroupBySystemStreamPartition, how about GroupByPartition
> FixedTasks and GroupBySystemStreamPartitionFixedTasks? The new names are
> equally long as the names you suggested. It seems a bit more intuitive
> because they would be prefixed with the grouper class name of their
> no-fixed-tasks counterpart. I have updated wiki with the new names. Can you
> let me know if it is OK?
>
> 2) Initially I want to design that config and interface later when we have
> more use-case so that we can have higher confidence in the interface
> design. But it seems that one common concern with the proposal is about its
> limitation assumption in the the old-partition-to-new-partition mapping. I
> have updated the wiki to illustrate the design of this interface and the
> new (and more general) assumption for the input system to use this
> partition expansion. Can you take a look and see if it is reasonable?
>
> 3) Yeah previously Jacob has raised the same concern and the solution is
> exactly the same as you suggested.
>
> Hey everyone,
>
> I have made non-trivial change to the wiki to illustrate the use of new
> config and interface for user to specify new-partition-to-old-partition
> mapping. Can you please help review it?
>
> Thanks,
> Dong
>
>
> On Thu, Jun 22, 2017 at 2:25 AM, Yi Pan  wrote:
>
> > Hi, Dong and everyone,
> >
> > Thanks for the detailed discussion on SEP-5! Really appreciate the
> thorough
> > consideration on this issue. I also noticed that Dong has updated the
> SEP-5
> > wiki to clarify:
> > 1) SEP-5 provides a solution to retain the same number of task/state w/o
> > re-partitioning (as illustrated in the stateful join example)
> > 2) Future work to expand number of tasks need to work together with
> > flexible re-partitioning to provide a complete solution
> >
> > Due to the cost to be paid in task number expansion:
> > 1) additional network I/O and latency in re-partitioning
> > 2) shuffling of the states among tasks
> > The current form of SEP-5 provides an alternative when partition
> expansion
> > in the messaging system is not due to increase of total input rate.
> >
> > The concern on the added complexity in grouper logic is valid. However,
> the
> > grouper-based solution is not completely unreasonable:
> > 1) Grouper is a public interface and we are already open to customized
> > implementation of groupers, although not being a main use case
> > 2) Deprecation of existing config-driven grouper needs longer time effort
> > to wait for fluent API has a better planner to automatically figuring out
> > the grouper to be used and stateful task expansion is automated. Hence,
> for
> > a foreseeable long time, grouper is still configured by the user.
> >
> > So, in general, I am in favor of the proposed SEP-5, given that it
> provides
> > a least-resistance to address some pain points for Samza users, w/o
> > breaking any existing use cases in opt-in mode.
> >
> > Some minor suggestions:
> > 1) The class names are too long. Can we change them to
> > FixedTasksGroupByPartition and FixedTasksGroupBySystemStreamPartition?
> > 2) I am still in favor of configurable partition expansion (i.e.
> new<->old
> > partition mapping) policy, since it makes this solution more general and
> > not fixed for Kafka. I am OK with default to power-of-2 expansion policy
> > and not introducing new config variable now.
> > 3) In the checkpoint/coordinator topic validation, KafkaCheckpointLogKey
> > class validates the current grouper factory class == the previous grouper
> > factory class in previous checkpoint. We need to make sure that we allow
> > the compatible change from GroupByPartition to
> FixedTasksGroupByPartition,
> > etc. Since FixedTasksGroupByPartition is a derived interface of
> > GroupByPartition, one possible solution is to check assignable (if
> current
> > grouper factory class is assignable to the previous grouper factory
> class)
> >
> > Thanks a lot!
> >
> > On Wed, Jun 21, 2017 at 5:11 PM, Navina Ramesh (Apache) <
> nav...@apache.org
> > >
> > wrote:
> >
> > > > But IMO it is the best available solution towards the support of
> > > partition expansion in comparison to alternative, no?
> > >
> > > At this time, relative to the other alternatives you have listed, this
> > is a
> > > path of least effort to solving this

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-22 Thread Dong Lin
Hey Yi,

Thanks for the detailed comment and the summary!

To address your comments:

1) The current names are GroupByPartitionWithFixedTaskNum and
GroupBySystemStreamPartitionWithFixedTaskNum. Instead of
FixedTasksGroupByPartition
and FixedTasksGroupBySystemStreamPartition, how about GroupByPartition
FixedTasks and GroupBySystemStreamPartitionFixedTasks? The new names are
equally long as the names you suggested. It seems a bit more intuitive
because they would be prefixed with the grouper class name of their
no-fixed-tasks counterpart. I have updated wiki with the new names. Can you
let me know if it is OK?

2) Initially I want to design that config and interface later when we have
more use-case so that we can have higher confidence in the interface
design. But it seems that one common concern with the proposal is about its
limitation assumption in the the old-partition-to-new-partition mapping. I
have updated the wiki to illustrate the design of this interface and the
new (and more general) assumption for the input system to use this
partition expansion. Can you take a look and see if it is reasonable?

3) Yeah previously Jacob has raised the same concern and the solution is
exactly the same as you suggested.

Hey everyone,

I have made non-trivial change to the wiki to illustrate the use of new
config and interface for user to specify new-partition-to-old-partition
mapping. Can you please help review it?

Thanks,
Dong


On Thu, Jun 22, 2017 at 2:25 AM, Yi Pan  wrote:

> Hi, Dong and everyone,
>
> Thanks for the detailed discussion on SEP-5! Really appreciate the thorough
> consideration on this issue. I also noticed that Dong has updated the SEP-5
> wiki to clarify:
> 1) SEP-5 provides a solution to retain the same number of task/state w/o
> re-partitioning (as illustrated in the stateful join example)
> 2) Future work to expand number of tasks need to work together with
> flexible re-partitioning to provide a complete solution
>
> Due to the cost to be paid in task number expansion:
> 1) additional network I/O and latency in re-partitioning
> 2) shuffling of the states among tasks
> The current form of SEP-5 provides an alternative when partition expansion
> in the messaging system is not due to increase of total input rate.
>
> The concern on the added complexity in grouper logic is valid. However, the
> grouper-based solution is not completely unreasonable:
> 1) Grouper is a public interface and we are already open to customized
> implementation of groupers, although not being a main use case
> 2) Deprecation of existing config-driven grouper needs longer time effort
> to wait for fluent API has a better planner to automatically figuring out
> the grouper to be used and stateful task expansion is automated. Hence, for
> a foreseeable long time, grouper is still configured by the user.
>
> So, in general, I am in favor of the proposed SEP-5, given that it provides
> a least-resistance to address some pain points for Samza users, w/o
> breaking any existing use cases in opt-in mode.
>
> Some minor suggestions:
> 1) The class names are too long. Can we change them to
> FixedTasksGroupByPartition and FixedTasksGroupBySystemStreamPartition?
> 2) I am still in favor of configurable partition expansion (i.e. new<->old
> partition mapping) policy, since it makes this solution more general and
> not fixed for Kafka. I am OK with default to power-of-2 expansion policy
> and not introducing new config variable now.
> 3) In the checkpoint/coordinator topic validation, KafkaCheckpointLogKey
> class validates the current grouper factory class == the previous grouper
> factory class in previous checkpoint. We need to make sure that we allow
> the compatible change from GroupByPartition to FixedTasksGroupByPartition,
> etc. Since FixedTasksGroupByPartition is a derived interface of
> GroupByPartition, one possible solution is to check assignable (if current
> grouper factory class is assignable to the previous grouper factory class)
>
> Thanks a lot!
>
> On Wed, Jun 21, 2017 at 5:11 PM, Navina Ramesh (Apache)  >
> wrote:
>
> > > But IMO it is the best available solution towards the support of
> > partition expansion in comparison to alternative, no?
> >
> > At this time, relative to the other alternatives you have listed, this
> is a
> > path of least effort to solving this problem. I agree to that. :)
> >
> > > I can merge those two sections or update the statement if the current
> > statement
> > has not clearly explained the reason of partition expansion in Kafka.
> >
> > Given the significance of what you are actually trying to solve, I think
> it
> > will be better to have it in points. Let me come find you and we can
> update
> > it.
> >
> > > I have updated wiki and added the task expansion to the Future Work
> > section.
> > On the other hand I still keep it in the Rejected Alternative section to
> > explain why this future work does not replace the existing proposal in
> > SEP-5. Does this sound reasonable?

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-22 Thread Yi Pan
Hi, Dong and everyone,

Thanks for the detailed discussion on SEP-5! Really appreciate the thorough
consideration on this issue. I also noticed that Dong has updated the SEP-5
wiki to clarify:
1) SEP-5 provides a solution to retain the same number of task/state w/o
re-partitioning (as illustrated in the stateful join example)
2) Future work to expand number of tasks need to work together with
flexible re-partitioning to provide a complete solution

Due to the cost to be paid in task number expansion:
1) additional network I/O and latency in re-partitioning
2) shuffling of the states among tasks
The current form of SEP-5 provides an alternative when partition expansion
in the messaging system is not due to increase of total input rate.

The concern on the added complexity in grouper logic is valid. However, the
grouper-based solution is not completely unreasonable:
1) Grouper is a public interface and we are already open to customized
implementation of groupers, although not being a main use case
2) Deprecation of existing config-driven grouper needs longer time effort
to wait for fluent API has a better planner to automatically figuring out
the grouper to be used and stateful task expansion is automated. Hence, for
a foreseeable long time, grouper is still configured by the user.

So, in general, I am in favor of the proposed SEP-5, given that it provides
a least-resistance to address some pain points for Samza users, w/o
breaking any existing use cases in opt-in mode.

Some minor suggestions:
1) The class names are too long. Can we change them to
FixedTasksGroupByPartition and FixedTasksGroupBySystemStreamPartition?
2) I am still in favor of configurable partition expansion (i.e. new<->old
partition mapping) policy, since it makes this solution more general and
not fixed for Kafka. I am OK with default to power-of-2 expansion policy
and not introducing new config variable now.
3) In the checkpoint/coordinator topic validation, KafkaCheckpointLogKey
class validates the current grouper factory class == the previous grouper
factory class in previous checkpoint. We need to make sure that we allow
the compatible change from GroupByPartition to FixedTasksGroupByPartition,
etc. Since FixedTasksGroupByPartition is a derived interface of
GroupByPartition, one possible solution is to check assignable (if current
grouper factory class is assignable to the previous grouper factory class)

Thanks a lot!

On Wed, Jun 21, 2017 at 5:11 PM, Navina Ramesh (Apache) 
wrote:

> > But IMO it is the best available solution towards the support of
> partition expansion in comparison to alternative, no?
>
> At this time, relative to the other alternatives you have listed, this is a
> path of least effort to solving this problem. I agree to that. :)
>
> > I can merge those two sections or update the statement if the current
> statement
> has not clearly explained the reason of partition expansion in Kafka.
>
> Given the significance of what you are actually trying to solve, I think it
> will be better to have it in points. Let me come find you and we can update
> it.
>
> > I have updated wiki and added the task expansion to the Future Work
> section.
> On the other hand I still keep it in the Rejected Alternative section to
> explain why this future work does not replace the existing proposal in
> SEP-5. Does this sound reasonable?
>
> It is very confusing to me how the same point can be under "Future Work"
> and "Rejected Alternative". There is no question about the future work
> *replacing* SEP-5. Iiuc, this SEP is a subset for the partition expansion
> solution. So, I don't think increasing task count should be a rejected
> alternative.
>
> > I am also not sure why a feature needs to be "utmost priority" in order
> to be accepted. Can you explain a bit on that?
>
> I don't think I ever claimed that the feature needs to be of "utmost
> priority" to be accepted. I was just stating my opinion.
>
>
> Thanks!
> Navina
>
> On Wed, Jun 21, 2017 at 3:52 PM, Dong Lin  wrote:
>
> > Thanks much for the reply Navina. Please see my reply inline.
> >
> > On Wed, Jun 21, 2017 at 2:57 PM, Navina Ramesh (Apache) <
> nav...@apache.org
> > >
> > wrote:
> >
> > > Thanks to Jake, Dong and Kartik for keeping the discussion going.
> > >
> > > > Here are the pros and cons of the extra re-partitioning stage in
> > > comparison
> > > to SEP-5.
> > >
> > > I think that is good summarization of pros/cons for the repartitioning
> > > stage based solution. Can you please include it in your SEP? It seems
> > like
> > > you already have access. If you are still unable to access the wiki
> page,
> > > feel free to walk over to Samza area and find me!
> > >
> >
> > Sure. I have added this summary to the Alternative Section.
> >
> >
> > >
> > > > I think there is always a way for user to mess up their job if they
> > > configure the Samza job incorrectly.
> > >
> > > I don't think Jake or anyone is arguing about an "incorrectly"
> configured
> > > Samza job. The 

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-21 Thread Dong Lin
Hey Navina,

I appreciate all the comments you have provided! I have updated the wiki to
remove the task expansion from Rejected Alternative section and put it only
in the Future Work section.

Thanks much!
Dong

On Wed, Jun 21, 2017 at 5:11 PM, Navina Ramesh (Apache) 
wrote:

> > But IMO it is the best available solution towards the support of
> partition expansion in comparison to alternative, no?
>
> At this time, relative to the other alternatives you have listed, this is a
> path of least effort to solving this problem. I agree to that. :)
>
> > I can merge those two sections or update the statement if the current
> statement
> has not clearly explained the reason of partition expansion in Kafka.
>
> Given the significance of what you are actually trying to solve, I think it
> will be better to have it in points. Let me come find you and we can update
> it.
>
> > I have updated wiki and added the task expansion to the Future Work
> section.
> On the other hand I still keep it in the Rejected Alternative section to
> explain why this future work does not replace the existing proposal in
> SEP-5. Does this sound reasonable?
>
> It is very confusing to me how the same point can be under "Future Work"
> and "Rejected Alternative". There is no question about the future work
> *replacing* SEP-5. Iiuc, this SEP is a subset for the partition expansion
> solution. So, I don't think increasing task count should be a rejected
> alternative.
>
> > I am also not sure why a feature needs to be "utmost priority" in order
> to be accepted. Can you explain a bit on that?
>
> I don't think I ever claimed that the feature needs to be of "utmost
> priority" to be accepted. I was just stating my opinion.
>
>
> Thanks!
> Navina
>
> On Wed, Jun 21, 2017 at 3:52 PM, Dong Lin  wrote:
>
> > Thanks much for the reply Navina. Please see my reply inline.
> >
> > On Wed, Jun 21, 2017 at 2:57 PM, Navina Ramesh (Apache) <
> nav...@apache.org
> > >
> > wrote:
> >
> > > Thanks to Jake, Dong and Kartik for keeping the discussion going.
> > >
> > > > Here are the pros and cons of the extra re-partitioning stage in
> > > comparison
> > > to SEP-5.
> > >
> > > I think that is good summarization of pros/cons for the repartitioning
> > > stage based solution. Can you please include it in your SEP? It seems
> > like
> > > you already have access. If you are still unable to access the wiki
> page,
> > > feel free to walk over to Samza area and find me!
> > >
> >
> > Sure. I have added this summary to the Alternative Section.
> >
> >
> > >
> > > > I think there is always a way for user to mess up their job if they
> > > configure the Samza job incorrectly.
> > >
> > > I don't think Jake or anyone is arguing about an "incorrectly"
> configured
> > > Samza job. The question was towards how easy/difficult it is for users
> to
> > > *not mess* up their job with incorrect configurations.
> > >
> > > > I also think the assumption made in this SEP is not particularly
> harder
> > > to understand than other existing configs in Samza.
> > >
> > > I disagree here. Other configs don't require you understand more than
> one
> > > assumption.
> > >
> > > There is already an overload of configs in Samza and I think we are
> > trying
> > > to shield it as much as possible from the users (esp. with fluent api).
> > > More specifically, we don't want the user to know about the internals
> of
> > > Samza such ssp grouper, taskname grouper etc. Since the proposed
> solution
> > > makes the configuration more complex to understand, it *is a* burden on
> > the
> > > user.
> > >
> > > Just because configs are the way it is, it doesn't mean we increase the
> > > complexity of it and push the burden on users to manage it correctly.
> My
> > > two cents.
> > >
> >
> > Sure, I agree the proposal requires user to understand the assumption in
> > order to expand the partition of the topic. But it is very subjective as
> to
> > whether the added complexity is acceptable or not. If there is better way
> > to allow user to expand partition of the input stream without making
> > assumption, then we can just do that. The current solution is not
> perfect.
> > But IMO it is the best available solution towards the support of
> partition
> > expansion in comparison to alternative, no?
> >
> >
> > > Here are a few things that I believe are needed for wrapping up the
> SEP:
> > >
> > > 1. For the longest time, I thought partition expansion happens in Kafka
> > > only when the volume of messages across partitions is too high. Based
> on
> > > this assumption, I would only assume that re-mapping expanded
> partitions
> > to
> > > the same task will have adverse effect on the throughput/resource
> > > utilization of the processor/container in Samza (for example, disk
> > > utilization may increase significantly. With disk quota throttling, it
> > > could cause the processor to drop.). However, after speaking with
> Xinyu,
> > it
> > > turns out that partition expansion also happens w

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-21 Thread Navina Ramesh (Apache)
> But IMO it is the best available solution towards the support of
partition expansion in comparison to alternative, no?

At this time, relative to the other alternatives you have listed, this is a
path of least effort to solving this problem. I agree to that. :)

> I can merge those two sections or update the statement if the current 
> statement
has not clearly explained the reason of partition expansion in Kafka.

Given the significance of what you are actually trying to solve, I think it
will be better to have it in points. Let me come find you and we can update
it.

> I have updated wiki and added the task expansion to the Future Work section.
On the other hand I still keep it in the Rejected Alternative section to
explain why this future work does not replace the existing proposal in
SEP-5. Does this sound reasonable?

It is very confusing to me how the same point can be under "Future Work"
and "Rejected Alternative". There is no question about the future work
*replacing* SEP-5. Iiuc, this SEP is a subset for the partition expansion
solution. So, I don't think increasing task count should be a rejected
alternative.

> I am also not sure why a feature needs to be "utmost priority" in order
to be accepted. Can you explain a bit on that?

I don't think I ever claimed that the feature needs to be of "utmost
priority" to be accepted. I was just stating my opinion.


Thanks!
Navina

On Wed, Jun 21, 2017 at 3:52 PM, Dong Lin  wrote:

> Thanks much for the reply Navina. Please see my reply inline.
>
> On Wed, Jun 21, 2017 at 2:57 PM, Navina Ramesh (Apache)  >
> wrote:
>
> > Thanks to Jake, Dong and Kartik for keeping the discussion going.
> >
> > > Here are the pros and cons of the extra re-partitioning stage in
> > comparison
> > to SEP-5.
> >
> > I think that is good summarization of pros/cons for the repartitioning
> > stage based solution. Can you please include it in your SEP? It seems
> like
> > you already have access. If you are still unable to access the wiki page,
> > feel free to walk over to Samza area and find me!
> >
>
> Sure. I have added this summary to the Alternative Section.
>
>
> >
> > > I think there is always a way for user to mess up their job if they
> > configure the Samza job incorrectly.
> >
> > I don't think Jake or anyone is arguing about an "incorrectly" configured
> > Samza job. The question was towards how easy/difficult it is for users to
> > *not mess* up their job with incorrect configurations.
> >
> > > I also think the assumption made in this SEP is not particularly harder
> > to understand than other existing configs in Samza.
> >
> > I disagree here. Other configs don't require you understand more than one
> > assumption.
> >
> > There is already an overload of configs in Samza and I think we are
> trying
> > to shield it as much as possible from the users (esp. with fluent api).
> > More specifically, we don't want the user to know about the internals of
> > Samza such ssp grouper, taskname grouper etc. Since the proposed solution
> > makes the configuration more complex to understand, it *is a* burden on
> the
> > user.
> >
> > Just because configs are the way it is, it doesn't mean we increase the
> > complexity of it and push the burden on users to manage it correctly. My
> > two cents.
> >
>
> Sure, I agree the proposal requires user to understand the assumption in
> order to expand the partition of the topic. But it is very subjective as to
> whether the added complexity is acceptable or not. If there is better way
> to allow user to expand partition of the input stream without making
> assumption, then we can just do that. The current solution is not perfect.
> But IMO it is the best available solution towards the support of partition
> expansion in comparison to alternative, no?
>
>
> > Here are a few things that I believe are needed for wrapping up the SEP:
> >
> > 1. For the longest time, I thought partition expansion happens in Kafka
> > only when the volume of messages across partitions is too high. Based on
> > this assumption, I would only assume that re-mapping expanded partitions
> to
> > the same task will have adverse effect on the throughput/resource
> > utilization of the processor/container in Samza (for example, disk
> > utilization may increase significantly. With disk quota throttling, it
> > could cause the processor to drop.). However, after speaking with Xinyu,
> it
> > turns out that partition expansion also happens when there is a
> > per-partition data retention limit imposed by Kafka (not sure if it is
> only
> > in LinkedIn or in Kafka open-source as well). Imo, this is the primary
> > use-case that we are trying to solve for in Samza and it is not very
> > obvious from the SEP.
> > @Dong, can you please explain *the circumstances under which partition
> > expansion can happen*, under "Motivation" section?  I disagree to the
> > current motivation described as -> "This design doc provides a solution
> to
> > increase partition number of t

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-21 Thread Dong Lin
Thanks much for the reply Navina. Please see my reply inline.

On Wed, Jun 21, 2017 at 2:57 PM, Navina Ramesh (Apache) 
wrote:

> Thanks to Jake, Dong and Kartik for keeping the discussion going.
>
> > Here are the pros and cons of the extra re-partitioning stage in
> comparison
> to SEP-5.
>
> I think that is good summarization of pros/cons for the repartitioning
> stage based solution. Can you please include it in your SEP? It seems like
> you already have access. If you are still unable to access the wiki page,
> feel free to walk over to Samza area and find me!
>

Sure. I have added this summary to the Alternative Section.


>
> > I think there is always a way for user to mess up their job if they
> configure the Samza job incorrectly.
>
> I don't think Jake or anyone is arguing about an "incorrectly" configured
> Samza job. The question was towards how easy/difficult it is for users to
> *not mess* up their job with incorrect configurations.
>
> > I also think the assumption made in this SEP is not particularly harder
> to understand than other existing configs in Samza.
>
> I disagree here. Other configs don't require you understand more than one
> assumption.
>
> There is already an overload of configs in Samza and I think we are trying
> to shield it as much as possible from the users (esp. with fluent api).
> More specifically, we don't want the user to know about the internals of
> Samza such ssp grouper, taskname grouper etc. Since the proposed solution
> makes the configuration more complex to understand, it *is a* burden on the
> user.
>
> Just because configs are the way it is, it doesn't mean we increase the
> complexity of it and push the burden on users to manage it correctly. My
> two cents.
>

Sure, I agree the proposal requires user to understand the assumption in
order to expand the partition of the topic. But it is very subjective as to
whether the added complexity is acceptable or not. If there is better way
to allow user to expand partition of the input stream without making
assumption, then we can just do that. The current solution is not perfect.
But IMO it is the best available solution towards the support of partition
expansion in comparison to alternative, no?


> Here are a few things that I believe are needed for wrapping up the SEP:
>
> 1. For the longest time, I thought partition expansion happens in Kafka
> only when the volume of messages across partitions is too high. Based on
> this assumption, I would only assume that re-mapping expanded partitions to
> the same task will have adverse effect on the throughput/resource
> utilization of the processor/container in Samza (for example, disk
> utilization may increase significantly. With disk quota throttling, it
> could cause the processor to drop.). However, after speaking with Xinyu, it
> turns out that partition expansion also happens when there is a
> per-partition data retention limit imposed by Kafka (not sure if it is only
> in LinkedIn or in Kafka open-source as well). Imo, this is the primary
> use-case that we are trying to solve for in Samza and it is not very
> obvious from the SEP.
> @Dong, can you please explain *the circumstances under which partition
> expansion can happen*, under "Motivation" section?  I disagree to the
> current motivation described as -> "This design doc provides a solution to
> increase partition number of the input streams of a stateful Samza job
> while still ensuring the correctness of Samze job output. "
> This is a solution, albeit not fully done through this SEP alone.
>

This is actually already described in the Problem and Goal section, i.e.
"For example, Kafka generally needs to limit the maximum size of each
partition to scale up its performance. Thus the number of partitions of a
Kafka topic needs to be expanded to reduce the partition size if the
average byte-in-rate or retention time of the Kafka topic has doubled". I
can merge those two sections or update the statement if the current
statement has not clearly explained the reason of partition expansion in
Kafka.


>
> 2. I think we are in consensus about the fact that increasing the task
> number and handling the state correctly is a good solution for Samza in the
> long-run. In your rejected alternatives, you mention "However, this feature
> alone does not solve the problem of allowing partition expansion.". What
> else is required to allow partition expansion? Can you please elaborate on
> that in point #1 of the rejected alternatives? If there is still more work
> to be done to support partition expansion in Samza, it is worthwhile to
> mention it under *Future Work*, instead of under "Rejected Alternatives".
> Perhaps you were waiting for edit permissions to the wiki. Please make this
> change so it is well-tracked.
>

I thought this is already explained in the rejected alternative section.
More specifically, it is said that "However, this feature alone does not
solve the problem of allowing partition expansion. For exa

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-21 Thread Navina Ramesh (Apache)
Thanks to Jake, Dong and Kartik for keeping the discussion going.

> Here are the pros and cons of the extra re-partitioning stage in
comparison
to SEP-5.

I think that is good summarization of pros/cons for the repartitioning
stage based solution. Can you please include it in your SEP? It seems like
you already have access. If you are still unable to access the wiki page,
feel free to walk over to Samza area and find me!

> I think there is always a way for user to mess up their job if they
configure the Samza job incorrectly.

I don't think Jake or anyone is arguing about an "incorrectly" configured
Samza job. The question was towards how easy/difficult it is for users to
*not mess* up their job with incorrect configurations.

> I also think the assumption made in this SEP is not particularly harder
to understand than other existing configs in Samza.

I disagree here. Other configs don't require you understand more than one
assumption.

There is already an overload of configs in Samza and I think we are trying
to shield it as much as possible from the users (esp. with fluent api).
More specifically, we don't want the user to know about the internals of
Samza such ssp grouper, taskname grouper etc. Since the proposed solution
makes the configuration more complex to understand, it *is a* burden on the
user.

Just because configs are the way it is, it doesn't mean we increase the
complexity of it and push the burden on users to manage it correctly. My
two cents.

Here are a few things that I believe are needed for wrapping up the SEP:

1. For the longest time, I thought partition expansion happens in Kafka
only when the volume of messages across partitions is too high. Based on
this assumption, I would only assume that re-mapping expanded partitions to
the same task will have adverse effect on the throughput/resource
utilization of the processor/container in Samza (for example, disk
utilization may increase significantly. With disk quota throttling, it
could cause the processor to drop.). However, after speaking with Xinyu, it
turns out that partition expansion also happens when there is a
per-partition data retention limit imposed by Kafka (not sure if it is only
in LinkedIn or in Kafka open-source as well). Imo, this is the primary
use-case that we are trying to solve for in Samza and it is not very
obvious from the SEP.
@Dong, can you please explain *the circumstances under which partition
expansion can happen*, under "Motivation" section?  I disagree to the
current motivation described as -> "This design doc provides a solution to
increase partition number of the input streams of a stateful Samza job
while still ensuring the correctness of Samze job output. "
This is a solution, albeit not fully done through this SEP alone.

2. I think we are in consensus about the fact that increasing the task
number and handling the state correctly is a good solution for Samza in the
long-run. In your rejected alternatives, you mention "However, this feature
alone does not solve the problem of allowing partition expansion.". What
else is required to allow partition expansion? Can you please elaborate on
that in point #1 of the rejected alternatives? If there is still more work
to be done to support partition expansion in Samza, it is worthwhile to
mention it under *Future Work*, instead of under "Rejected Alternatives".
Perhaps you were waiting for edit permissions to the wiki. Please make this
change so it is well-tracked.

I am still not totally crazy about the proposed solution because it is not
clear for open-source, who or which use-cases stand to benefit. I am not
convinced that this problem is of utmost priority for the Samza community
*at this point of time*.

I am on the same page as Jake on this one. Not a +1, just a 0 (if that even
matters).

Thanks!
Navina

On Sun, Jun 18, 2017 at 12:04 AM, Dong Lin  wrote:

> BTW, I will update the SEP-5 wiki with our latest discussion after I have
> got the wiki edit access.
>
> On Sat, Jun 17, 2017 at 11:36 PM, Dong Lin  wrote:
>
> > Thanks everyone for the comment!
> >
> > I am currently leaning towards the current approach. I think Kartik
> raised
> > a good point that the extra repartitoning stage will also incur
> additional
> > throughput on Kafka in addition to the potential storage cost. Any other
> > Samza developers also chime in and provide your opinions on this
> proposal?
> >
> > Since this discussion thread has been open for three weeks, I will
> > initiate voting thread on Monday if there is no major revision
> suggestion.
> >
> > Thanks,
> > Dong
> >
> >
> > On Thu, Jun 15, 2017 at 6:32 PM, Kartik Paramasivam <
> > kparamasi...@linkedin.com.invalid> wrote:
> >
> >> Great discussion !
> >>
> >> Here are some more thoughts
> >>
> >> The point that repartitioning is a more general purpose solution is
> surely
> >> spot on.  For many source systems (Kinesis, Google Pub-Sub, any of the
> >> older queuing systems (rabbitMQ etc. etc.), repartitioning is anyways

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-18 Thread Dong Lin
BTW, I will update the SEP-5 wiki with our latest discussion after I have
got the wiki edit access.

On Sat, Jun 17, 2017 at 11:36 PM, Dong Lin  wrote:

> Thanks everyone for the comment!
>
> I am currently leaning towards the current approach. I think Kartik raised
> a good point that the extra repartitoning stage will also incur additional
> throughput on Kafka in addition to the potential storage cost. Any other
> Samza developers also chime in and provide your opinions on this proposal?
>
> Since this discussion thread has been open for three weeks, I will
> initiate voting thread on Monday if there is no major revision suggestion.
>
> Thanks,
> Dong
>
>
> On Thu, Jun 15, 2017 at 6:32 PM, Kartik Paramasivam <
> kparamasi...@linkedin.com.invalid> wrote:
>
>> Great discussion !
>>
>> Here are some more thoughts
>>
>> The point that repartitioning is a more general purpose solution is surely
>> spot on.  For many source systems (Kinesis, Google Pub-Sub, any of the
>> older queuing systems (rabbitMQ etc. etc.), repartitioning is anyways
>> functionally required to do even simple keyed aggregations.   But in most
>> of these systems, the concept of repartitioning either does not exist or
>> exists in a way which is very unique (e.g. Kinesis).
>>
>> I think this feature is really only interesting for source systems like
>> Kafka and EventHub.  EventHub (last I checked) didn't support
>> repartitioning. So this is probably not super-interesting (yet) for
>> EventHub.
>>
>> So Kafka is clearly the main use case here.
>>
>> For Kafka, I think it is pretty rare for people to customize the hashing
>> algorithm for sending messages.  I would argue that less than 5% of the
>> population (i am being generous ;)) would do that.   The current proposal
>> works with the default hashing scheme for Kafka.  So organizations will
>> typically never have to coordinate.
>>
>> If the proposed alternative (always repartition) was side-effect free,
>> then
>> it would make sense to use an alternative design that would work for 100%
>> of the population.Repartitioning all input would however not be a
>> feasible solution (atleast at LinkedIn) as it would double the kafka
>> workload.If many samza jobs read from kafka topics, then the increase
>> would be a function of the number of samza jobs.
>>
>> For low throughput kafka topics, surely explicit repartitioning using
>> fluent api is feasible.
>>
>> If the proposal was to make this new policy the default then that would
>> clearly not make much sense.
>>
>> But it is an opt in policy.  If it is not applicable, people don't have to
>> use it.
>>
>> I do have some questions about the implementation. I will try to respond
>> back after spending some more time on this.
>>
>>
>>
>> On Thu, Jun 15, 2017 at 7:53 AM, Jacob Maes  wrote:
>>
>> > Thanks, Dong.
>> >
>> > The summary looks accurate.
>> >
>> > I'll let the others chime in, as I believe my perspective has been
>> > adequately captured in this thread.
>> >
>> > -Jake
>> >
>> > On Wed, Jun 14, 2017 at 12:12 PM, Dong Lin  wrote:
>> >
>> > > Hey Jacob,
>> > >
>> > > Thank you for taking so much time to discuss with me! I appreciate the
>> > > discussion and the insight. I will summarize our discussion below.
>> > >
>> > > 1) Whether it is reasonable to store partition-to-task mapping.
>> > >
>> > > We agree that this partition-to-task mapping will be reasonable if we
>> > allow
>> > > user to specify either the new-partition-to-old-partition mapping or
>> > > key-to-partition mapping in the future. SEP-5 doesn't currently
>> provide a
>> > > way for user to specify new-partition-to-old partition mapping
>> because we
>> > > don't have a good idea about that interface until we try to enable
>> > > partition expansion for input system other than Kafka in the future.
>> This
>> > > is currently specified as the third future work in SEP-5.
>> > >
>> > > And if we decide to implement SEP-5, I will include a warning message
>> > > regarding the use of partition-to-task, i.e. "this does not specify
>> the
>> > > key-to-task mapping". We agree that this could address the concern
>> here.
>> > >
>> > > 2) Whether we should follow the approach in SEP-5 or use an extra
>> > > re-partitioning stage in the stateful Samza job to enable partition
>> > > expansion.
>> > >
>> > > Here are the pros and cons of the extra re-partitioning stage in
>> > comparison
>> > > to SEP-5.
>> > >
>> > > Pros:
>> > > - It doesn't require owner of the Samza job to know the partitioning
>> > > algorithm of used for the input stream. If the owner of the Samza job
>> is
>> > in
>> > > a different organization than the producer of the input stream, this
>> > > solution frees different organizations from having to coordinate with
>> > each
>> > > other.
>> > > - It doesn't require owner of the Samza job to specify the
>> partitioning
>> > > algorithm of used for the input stream. Thus less config.
>> > >
>> > > Cons:
>> > > - User has to make code change on t

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-17 Thread Dong Lin
Thanks everyone for the comment!

I am currently leaning towards the current approach. I think Kartik raised
a good point that the extra repartitoning stage will also incur additional
throughput on Kafka in addition to the potential storage cost. Any other
Samza developers also chime in and provide your opinions on this proposal?

Since this discussion thread has been open for three weeks, I will initiate
voting thread on Monday if there is no major revision suggestion.

Thanks,
Dong


On Thu, Jun 15, 2017 at 6:32 PM, Kartik Paramasivam <
kparamasi...@linkedin.com.invalid> wrote:

> Great discussion !
>
> Here are some more thoughts
>
> The point that repartitioning is a more general purpose solution is surely
> spot on.  For many source systems (Kinesis, Google Pub-Sub, any of the
> older queuing systems (rabbitMQ etc. etc.), repartitioning is anyways
> functionally required to do even simple keyed aggregations.   But in most
> of these systems, the concept of repartitioning either does not exist or
> exists in a way which is very unique (e.g. Kinesis).
>
> I think this feature is really only interesting for source systems like
> Kafka and EventHub.  EventHub (last I checked) didn't support
> repartitioning. So this is probably not super-interesting (yet) for
> EventHub.
>
> So Kafka is clearly the main use case here.
>
> For Kafka, I think it is pretty rare for people to customize the hashing
> algorithm for sending messages.  I would argue that less than 5% of the
> population (i am being generous ;)) would do that.   The current proposal
> works with the default hashing scheme for Kafka.  So organizations will
> typically never have to coordinate.
>
> If the proposed alternative (always repartition) was side-effect free, then
> it would make sense to use an alternative design that would work for 100%
> of the population.Repartitioning all input would however not be a
> feasible solution (atleast at LinkedIn) as it would double the kafka
> workload.If many samza jobs read from kafka topics, then the increase
> would be a function of the number of samza jobs.
>
> For low throughput kafka topics, surely explicit repartitioning using
> fluent api is feasible.
>
> If the proposal was to make this new policy the default then that would
> clearly not make much sense.
>
> But it is an opt in policy.  If it is not applicable, people don't have to
> use it.
>
> I do have some questions about the implementation. I will try to respond
> back after spending some more time on this.
>
>
>
> On Thu, Jun 15, 2017 at 7:53 AM, Jacob Maes  wrote:
>
> > Thanks, Dong.
> >
> > The summary looks accurate.
> >
> > I'll let the others chime in, as I believe my perspective has been
> > adequately captured in this thread.
> >
> > -Jake
> >
> > On Wed, Jun 14, 2017 at 12:12 PM, Dong Lin  wrote:
> >
> > > Hey Jacob,
> > >
> > > Thank you for taking so much time to discuss with me! I appreciate the
> > > discussion and the insight. I will summarize our discussion below.
> > >
> > > 1) Whether it is reasonable to store partition-to-task mapping.
> > >
> > > We agree that this partition-to-task mapping will be reasonable if we
> > allow
> > > user to specify either the new-partition-to-old-partition mapping or
> > > key-to-partition mapping in the future. SEP-5 doesn't currently
> provide a
> > > way for user to specify new-partition-to-old partition mapping because
> we
> > > don't have a good idea about that interface until we try to enable
> > > partition expansion for input system other than Kafka in the future.
> This
> > > is currently specified as the third future work in SEP-5.
> > >
> > > And if we decide to implement SEP-5, I will include a warning message
> > > regarding the use of partition-to-task, i.e. "this does not specify the
> > > key-to-task mapping". We agree that this could address the concern
> here.
> > >
> > > 2) Whether we should follow the approach in SEP-5 or use an extra
> > > re-partitioning stage in the stateful Samza job to enable partition
> > > expansion.
> > >
> > > Here are the pros and cons of the extra re-partitioning stage in
> > comparison
> > > to SEP-5.
> > >
> > > Pros:
> > > - It doesn't require owner of the Samza job to know the partitioning
> > > algorithm of used for the input stream. If the owner of the Samza job
> is
> > in
> > > a different organization than the producer of the input stream, this
> > > solution frees different organizations from having to coordinate with
> > each
> > > other.
> > > - It doesn't require owner of the Samza job to specify the partitioning
> > > algorithm of used for the input stream. Thus less config.
> > >
> > > Cons:
> > > - User has to make code change on their side to use the new fluent API.
> > > - The extra partitioning stage would potentially increases latency.
> > > - The extra partitioning stage would incur additional cost due to the
> > extra
> > > internal topic. The cost is probably not that much with the new trim()
> > API
> > > in Kaf

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-15 Thread Kartik Paramasivam
Great discussion !

Here are some more thoughts

The point that repartitioning is a more general purpose solution is surely
spot on.  For many source systems (Kinesis, Google Pub-Sub, any of the
older queuing systems (rabbitMQ etc. etc.), repartitioning is anyways
functionally required to do even simple keyed aggregations.   But in most
of these systems, the concept of repartitioning either does not exist or
exists in a way which is very unique (e.g. Kinesis).

I think this feature is really only interesting for source systems like
Kafka and EventHub.  EventHub (last I checked) didn't support
repartitioning. So this is probably not super-interesting (yet) for
EventHub.

So Kafka is clearly the main use case here.

For Kafka, I think it is pretty rare for people to customize the hashing
algorithm for sending messages.  I would argue that less than 5% of the
population (i am being generous ;)) would do that.   The current proposal
works with the default hashing scheme for Kafka.  So organizations will
typically never have to coordinate.

If the proposed alternative (always repartition) was side-effect free, then
it would make sense to use an alternative design that would work for 100%
of the population.Repartitioning all input would however not be a
feasible solution (atleast at LinkedIn) as it would double the kafka
workload.If many samza jobs read from kafka topics, then the increase
would be a function of the number of samza jobs.

For low throughput kafka topics, surely explicit repartitioning using
fluent api is feasible.

If the proposal was to make this new policy the default then that would
clearly not make much sense.

But it is an opt in policy.  If it is not applicable, people don't have to
use it.

I do have some questions about the implementation. I will try to respond
back after spending some more time on this.



On Thu, Jun 15, 2017 at 7:53 AM, Jacob Maes  wrote:

> Thanks, Dong.
>
> The summary looks accurate.
>
> I'll let the others chime in, as I believe my perspective has been
> adequately captured in this thread.
>
> -Jake
>
> On Wed, Jun 14, 2017 at 12:12 PM, Dong Lin  wrote:
>
> > Hey Jacob,
> >
> > Thank you for taking so much time to discuss with me! I appreciate the
> > discussion and the insight. I will summarize our discussion below.
> >
> > 1) Whether it is reasonable to store partition-to-task mapping.
> >
> > We agree that this partition-to-task mapping will be reasonable if we
> allow
> > user to specify either the new-partition-to-old-partition mapping or
> > key-to-partition mapping in the future. SEP-5 doesn't currently provide a
> > way for user to specify new-partition-to-old partition mapping because we
> > don't have a good idea about that interface until we try to enable
> > partition expansion for input system other than Kafka in the future. This
> > is currently specified as the third future work in SEP-5.
> >
> > And if we decide to implement SEP-5, I will include a warning message
> > regarding the use of partition-to-task, i.e. "this does not specify the
> > key-to-task mapping". We agree that this could address the concern here.
> >
> > 2) Whether we should follow the approach in SEP-5 or use an extra
> > re-partitioning stage in the stateful Samza job to enable partition
> > expansion.
> >
> > Here are the pros and cons of the extra re-partitioning stage in
> comparison
> > to SEP-5.
> >
> > Pros:
> > - It doesn't require owner of the Samza job to know the partitioning
> > algorithm of used for the input stream. If the owner of the Samza job is
> in
> > a different organization than the producer of the input stream, this
> > solution frees different organizations from having to coordinate with
> each
> > other.
> > - It doesn't require owner of the Samza job to specify the partitioning
> > algorithm of used for the input stream. Thus less config.
> >
> > Cons:
> > - User has to make code change on their side to use the new fluent API.
> > - The extra partitioning stage would potentially increases latency.
> > - The extra partitioning stage would incur additional cost due to the
> extra
> > internal topic. The cost is probably not that much with the new trim()
> API
> > in Kafka if Samza uses Kafka to store the internal topic. But the cost
> may
> > be doubled if Samza uses another input system that doesn't provide trim()
> > API to delete data on demand.
> >
> > My recommendation is to adopt a hybrid solution, i.e. we still implement
> > the current proposal in SEP-5 so that we enable partition expansion
> without
> > incurring extra latency/cost and without requiring users to change their
> > code. And we can recommend user to use the extra partitioning stage if
> the
> > coordination among different organization is indeed a concern.
> >
> > Can other developers also provide feedback regarding your preference
> > between the two?
> >
> > Thanks,
> > Dong
> >
> >
> >
> >
> > On Tue, Jun 13, 2017 at 9:30 AM, Jacob Maes 
> wrote:
> >
> > > Hey Dong,
> > >

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-15 Thread Jacob Maes
Thanks, Dong.

The summary looks accurate.

I'll let the others chime in, as I believe my perspective has been
adequately captured in this thread.

-Jake

On Wed, Jun 14, 2017 at 12:12 PM, Dong Lin  wrote:

> Hey Jacob,
>
> Thank you for taking so much time to discuss with me! I appreciate the
> discussion and the insight. I will summarize our discussion below.
>
> 1) Whether it is reasonable to store partition-to-task mapping.
>
> We agree that this partition-to-task mapping will be reasonable if we allow
> user to specify either the new-partition-to-old-partition mapping or
> key-to-partition mapping in the future. SEP-5 doesn't currently provide a
> way for user to specify new-partition-to-old partition mapping because we
> don't have a good idea about that interface until we try to enable
> partition expansion for input system other than Kafka in the future. This
> is currently specified as the third future work in SEP-5.
>
> And if we decide to implement SEP-5, I will include a warning message
> regarding the use of partition-to-task, i.e. "this does not specify the
> key-to-task mapping". We agree that this could address the concern here.
>
> 2) Whether we should follow the approach in SEP-5 or use an extra
> re-partitioning stage in the stateful Samza job to enable partition
> expansion.
>
> Here are the pros and cons of the extra re-partitioning stage in comparison
> to SEP-5.
>
> Pros:
> - It doesn't require owner of the Samza job to know the partitioning
> algorithm of used for the input stream. If the owner of the Samza job is in
> a different organization than the producer of the input stream, this
> solution frees different organizations from having to coordinate with each
> other.
> - It doesn't require owner of the Samza job to specify the partitioning
> algorithm of used for the input stream. Thus less config.
>
> Cons:
> - User has to make code change on their side to use the new fluent API.
> - The extra partitioning stage would potentially increases latency.
> - The extra partitioning stage would incur additional cost due to the extra
> internal topic. The cost is probably not that much with the new trim() API
> in Kafka if Samza uses Kafka to store the internal topic. But the cost may
> be doubled if Samza uses another input system that doesn't provide trim()
> API to delete data on demand.
>
> My recommendation is to adopt a hybrid solution, i.e. we still implement
> the current proposal in SEP-5 so that we enable partition expansion without
> incurring extra latency/cost and without requiring users to change their
> code. And we can recommend user to use the extra partitioning stage if the
> coordination among different organization is indeed a concern.
>
> Can other developers also provide feedback regarding your preference
> between the two?
>
> Thanks,
> Dong
>
>
>
>
> On Tue, Jun 13, 2017 at 9:30 AM, Jacob Maes  wrote:
>
> > Hey Dong,
> >
> > I appreciate your thoughtful responses. Let's do one more round :-)
> >
> >
> > > Here are my current concern with the three alternatives you described
> > > earlier:
> > > - The first alternative requires support from input system which is
> > > currently not available. It will limit the usage of partition expansion
> > to
> > > only systems that support such interface. And it is not guaranteed that
> > we
> > > can persuade the developer of the input system to add this interface.
> > This
> > > is not desirable for Samza in the long term.
> >
> > Agreed. It is very wishful thinking that each supported system would
> > provide such a contract.
> >
> >
> > >
> > > - I can not comment on the second alternative because I don't
> understand
> > > how it reshuffles all existing changelog data. We can discuss more if
> > there
> > > is more specific detail. My gut feel is that this will be complex and
> > > carries performance overhead.
> >
> > After giving this more thought, I agree. There is no clear way to
> migrate a
> > changelog without knowing the original key->partition mapping. Which
> leads
> > us to alternative 3...
> >
> >
> > >
> > > - The third alternative requires performance overhead. Given that user
> > can
> > > already use this solution to enable partition expansion, maybe Samza
> > > developers can provide more input as to why we are not doing it by
> > default.
> > > My gut feel is that it carries considerable performance overhead and
> > > increases the cost-to-serve Samze job (e.g. disk usage), which may make
> > it
> > > undesirable in the long term.
> >
> > I think the only performance overhead would be the mandatory
> repartitioning
> > stage for stateful jobs. But a repartitioner is usually much faster than
> > the downstream stateful job, so it only seems a cost to serve issue.
> >
> > As for why we aren't already doing this, I would posit that before the
> > introduction of the high level API, which trivializes repartitioning, it
> > was unreasonable to expect each job owner to do the mandatory
> > repartitioning. With

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-14 Thread Dong Lin
Hey Jacob,

Thank you for taking so much time to discuss with me! I appreciate the
discussion and the insight. I will summarize our discussion below.

1) Whether it is reasonable to store partition-to-task mapping.

We agree that this partition-to-task mapping will be reasonable if we allow
user to specify either the new-partition-to-old-partition mapping or
key-to-partition mapping in the future. SEP-5 doesn't currently provide a
way for user to specify new-partition-to-old partition mapping because we
don't have a good idea about that interface until we try to enable
partition expansion for input system other than Kafka in the future. This
is currently specified as the third future work in SEP-5.

And if we decide to implement SEP-5, I will include a warning message
regarding the use of partition-to-task, i.e. "this does not specify the
key-to-task mapping". We agree that this could address the concern here.

2) Whether we should follow the approach in SEP-5 or use an extra
re-partitioning stage in the stateful Samza job to enable partition
expansion.

Here are the pros and cons of the extra re-partitioning stage in comparison
to SEP-5.

Pros:
- It doesn't require owner of the Samza job to know the partitioning
algorithm of used for the input stream. If the owner of the Samza job is in
a different organization than the producer of the input stream, this
solution frees different organizations from having to coordinate with each
other.
- It doesn't require owner of the Samza job to specify the partitioning
algorithm of used for the input stream. Thus less config.

Cons:
- User has to make code change on their side to use the new fluent API.
- The extra partitioning stage would potentially increases latency.
- The extra partitioning stage would incur additional cost due to the extra
internal topic. The cost is probably not that much with the new trim() API
in Kafka if Samza uses Kafka to store the internal topic. But the cost may
be doubled if Samza uses another input system that doesn't provide trim()
API to delete data on demand.

My recommendation is to adopt a hybrid solution, i.e. we still implement
the current proposal in SEP-5 so that we enable partition expansion without
incurring extra latency/cost and without requiring users to change their
code. And we can recommend user to use the extra partitioning stage if the
coordination among different organization is indeed a concern.

Can other developers also provide feedback regarding your preference
between the two?

Thanks,
Dong




On Tue, Jun 13, 2017 at 9:30 AM, Jacob Maes  wrote:

> Hey Dong,
>
> I appreciate your thoughtful responses. Let's do one more round :-)
>
>
> > Here are my current concern with the three alternatives you described
> > earlier:
> > - The first alternative requires support from input system which is
> > currently not available. It will limit the usage of partition expansion
> to
> > only systems that support such interface. And it is not guaranteed that
> we
> > can persuade the developer of the input system to add this interface.
> This
> > is not desirable for Samza in the long term.
>
> Agreed. It is very wishful thinking that each supported system would
> provide such a contract.
>
>
> >
> > - I can not comment on the second alternative because I don't understand
> > how it reshuffles all existing changelog data. We can discuss more if
> there
> > is more specific detail. My gut feel is that this will be complex and
> > carries performance overhead.
>
> After giving this more thought, I agree. There is no clear way to migrate a
> changelog without knowing the original key->partition mapping. Which leads
> us to alternative 3...
>
>
> >
> > - The third alternative requires performance overhead. Given that user
> can
> > already use this solution to enable partition expansion, maybe Samza
> > developers can provide more input as to why we are not doing it by
> default.
> > My gut feel is that it carries considerable performance overhead and
> > increases the cost-to-serve Samze job (e.g. disk usage), which may make
> it
> > undesirable in the long term.
>
> I think the only performance overhead would be the mandatory repartitioning
> stage for stateful jobs. But a repartitioner is usually much faster than
> the downstream stateful job, so it only seems a cost to serve issue.
>
> As for why we aren't already doing this, I would posit that before the
> introduction of the high level API, which trivializes repartitioning, it
> was unreasonable to expect each job owner to do the mandatory
> repartitioning. With the high level API, I would argue this is much more
> doable.
>
>
> I am not sure it is true that "any future feature that utilizes this
> > mapping without accounting for the assumptions of this SEP is likely to
> > malfunction". Suppose we allow user to specify new-to-old-partition
> > mapping, then we can use the partition-to-task mapping correctly without
> > replying on the assumption in this SEP, right?
>
> Right

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-13 Thread Jacob Maes
Hey Dong,

I appreciate your thoughtful responses. Let's do one more round :-)


> Here are my current concern with the three alternatives you described
> earlier:
> - The first alternative requires support from input system which is
> currently not available. It will limit the usage of partition expansion to
> only systems that support such interface. And it is not guaranteed that we
> can persuade the developer of the input system to add this interface. This
> is not desirable for Samza in the long term.

Agreed. It is very wishful thinking that each supported system would
provide such a contract.


>
> - I can not comment on the second alternative because I don't understand
> how it reshuffles all existing changelog data. We can discuss more if there
> is more specific detail. My gut feel is that this will be complex and
> carries performance overhead.

After giving this more thought, I agree. There is no clear way to migrate a
changelog without knowing the original key->partition mapping. Which leads
us to alternative 3...


>
> - The third alternative requires performance overhead. Given that user can
> already use this solution to enable partition expansion, maybe Samza
> developers can provide more input as to why we are not doing it by default.
> My gut feel is that it carries considerable performance overhead and
> increases the cost-to-serve Samze job (e.g. disk usage), which may make it
> undesirable in the long term.

I think the only performance overhead would be the mandatory repartitioning
stage for stateful jobs. But a repartitioner is usually much faster than
the downstream stateful job, so it only seems a cost to serve issue.

As for why we aren't already doing this, I would posit that before the
introduction of the high level API, which trivializes repartitioning, it
was unreasonable to expect each job owner to do the mandatory
repartitioning. With the high level API, I would argue this is much more
doable.


I am not sure it is true that "any future feature that utilizes this
> mapping without accounting for the assumptions of this SEP is likely to
> malfunction". Suppose we allow user to specify new-to-old-partition
> mapping, then we can use the partition-to-task mapping correctly without
> replying on the assumption in this SEP, right?

Right, but my point was that the partition->task mapping is not sufficient
by itself. So adding it by itself is potentially misleading.

On Mon, Jun 12, 2017 at 8:34 PM, Dong Lin  wrote:

> Thanks for the reply Jacob. Please see my comment inline.
>
> On Mon, Jun 12, 2017 at 7:51 PM, Jacob Maes  wrote:
>
> > >
> > > - For users that need partition expansion of the input streams for
> > stateful
> > > job, they have a really big headache in the sense that Samza does not
> > allow
> > > partition expansion for stateful job. SEP-5 addresses this headache for
> > > them.
> > > You are right that SEP-5 requires user to understand and enforce
> > > limitations across organizations. But it is still much better than not
> > > allowing user to expansion partition for stateful jobs at all, right?
> > Did I
> > > miss something here?
> >
> > I guess this one is a matter of perspective.
> >
> > One argument is that if the system supports one case, it's better than
> none
> > because there is one less scenario in which the system does the wrong
> > thing.
> >
> > The counter argument is for uniform and consistent behavior, which is
> easy
> > for users to understand and properly leverage.
> >
> > Specifically, I'd argue that the current rule is very simple: "you cannot
> > repartition inputs on a stateful job, so you must over-partition the
> > initial implementation". To me, while that rule is not ideal, its
> > simplicity is better that introducing a new solution that has a bunch of
> > caveats, any one of which could be missed. If any one of the assumptions
> in
> > this SEP design are violated, the job would behave incorrectly. That
> puts a
> > lot more burden on the users than the simpler rule.
> >
>
> I agree that we have different perspective here. It is true that user would
> mess up their job if they used this feature in a wrong way, i.e. violate
> the assumption made in SEP-5. On the other hand, I think there is always a
> way for user to mess up their job if they configure the Samza job
> incorrectly. I also think the assumption made in this SEP is not
> particularly harder to understand than other existing configs in Samza.
>
> The answer to this can be subjective. I would love to hear perspective from
> other developers on this issue.
>
>
> >
> > That's why I mentioned a few alternatives that, while more complex to
> > implement, would provide a more consistent behavior with simple rules for
> > the users.
> >
>
> I am open to discuss alternative solutions that can address the the problem
> in a better manner. I am not opposed to complexity as long as it gives us
> good long term benefits.
>
> Here are my current concern with the three alternatives y

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-12 Thread Dong Lin
Thanks for the reply Jacob. Please see my comment inline.

On Mon, Jun 12, 2017 at 7:51 PM, Jacob Maes  wrote:

> >
> > - For users that need partition expansion of the input streams for
> stateful
> > job, they have a really big headache in the sense that Samza does not
> allow
> > partition expansion for stateful job. SEP-5 addresses this headache for
> > them.
> > You are right that SEP-5 requires user to understand and enforce
> > limitations across organizations. But it is still much better than not
> > allowing user to expansion partition for stateful jobs at all, right?
> Did I
> > miss something here?
>
> I guess this one is a matter of perspective.
>
> One argument is that if the system supports one case, it's better than none
> because there is one less scenario in which the system does the wrong
> thing.
>
> The counter argument is for uniform and consistent behavior, which is easy
> for users to understand and properly leverage.
>
> Specifically, I'd argue that the current rule is very simple: "you cannot
> repartition inputs on a stateful job, so you must over-partition the
> initial implementation". To me, while that rule is not ideal, its
> simplicity is better that introducing a new solution that has a bunch of
> caveats, any one of which could be missed. If any one of the assumptions in
> this SEP design are violated, the job would behave incorrectly. That puts a
> lot more burden on the users than the simpler rule.
>

I agree that we have different perspective here. It is true that user would
mess up their job if they used this feature in a wrong way, i.e. violate
the assumption made in SEP-5. On the other hand, I think there is always a
way for user to mess up their job if they configure the Samza job
incorrectly. I also think the assumption made in this SEP is not
particularly harder to understand than other existing configs in Samza.

The answer to this can be subjective. I would love to hear perspective from
other developers on this issue.


>
> That's why I mentioned a few alternatives that, while more complex to
> implement, would provide a more consistent behavior with simple rules for
> the users.
>

I am open to discuss alternative solutions that can address the the problem
in a better manner. I am not opposed to complexity as long as it gives us
good long term benefits.

Here are my current concern with the three alternatives you described
earlier:

- The first alternative requires support from input system which is
currently not available. It will limit the usage of partition expansion to
only systems that support such interface. And it is not guaranteed that we
can persuade the developer of the input system to add this interface. This
is not desirable for Samza in the long term.

- I can not comment on the second alternative because I don't understand
how it reshuffles all existing changelog data. We can discuss more if there
is more specific detail. My gut feel is that this will be complex and
carries performance overhead.

- The third alternative requires performance overhead. Given that user can
already use this solution to enable partition expansion, maybe Samza
developers can provide more input as to why we are not doing it by default.
My gut feel is that it carries considerable performance overhead and
increases the cost-to-serve Samze job (e.g. disk usage), which may make it
undesirable in the long term.



>
> Yes, we need a similar check for GroupBySystemStreamPartitionWi
> > thFixedTaskNum
> > as well. If there is more grouper classes needed in the future, we can
> > solve this problem cleanly without new config. Given the
> > previousGrouperClass and newGrouperClass, KafkaCheckpointLogKey will
> throw
> > exception if and only if newGrouperClass is an instance of
> > previousGrouperClass.
> > GroupBySystemStreamPartitionWithFixedTaskNum should extend
> > GroupBySystemStreamPartition
> > and GroupByPartitionWithFixedTaskNum should extend GroupByPartition.
> Does
> > this address your concern?
>
> Sounds workable, thanks.
>
> >
> > Can
> > you be more specific why Partition-to-task mapping is not meaningful
> > without
> > some definition of the key-to-partition assignments and why it is
> > incomplete and misleading?
>
>  A partition is (in my naive interpretation) an independent queue for
> messages of a particular key set. It is not the *identity* of the partition
> that determine the contents of the associated task's local state. Rather it
> is the *contents* of the partition that affect the task's state. A
> partiton-to-task mapping only captures an identity relationship:
> partition1->task1. Without the assumptions of this SEP, this is
> insufficient to determine the assignment of keys to tasks, which is what
> really matters. Therefore, any future feature that utilizes this mapping
> without accounting for the assumptions of this SEP is likely to
> malfunction.
>
>
I am not sure it is true that "any future feature that utilizes this
mapping without accounting f

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-12 Thread Jacob Maes
>
> - For users that need partition expansion of the input streams for stateful
> job, they have a really big headache in the sense that Samza does not allow
> partition expansion for stateful job. SEP-5 addresses this headache for
> them.
> You are right that SEP-5 requires user to understand and enforce
> limitations across organizations. But it is still much better than not
> allowing user to expansion partition for stateful jobs at all, right? Did I
> miss something here?

I guess this one is a matter of perspective.

One argument is that if the system supports one case, it's better than none
because there is one less scenario in which the system does the wrong
thing.

The counter argument is for uniform and consistent behavior, which is easy
for users to understand and properly leverage.

Specifically, I'd argue that the current rule is very simple: "you cannot
repartition inputs on a stateful job, so you must over-partition the
initial implementation". To me, while that rule is not ideal, its
simplicity is better that introducing a new solution that has a bunch of
caveats, any one of which could be missed. If any one of the assumptions in
this SEP design are violated, the job would behave incorrectly. That puts a
lot more burden on the users than the simpler rule.

That's why I mentioned a few alternatives that, while more complex to
implement, would provide a more consistent behavior with simple rules for
the users.

Yes, we need a similar check for GroupBySystemStreamPartitionWi
> thFixedTaskNum
> as well. If there is more grouper classes needed in the future, we can
> solve this problem cleanly without new config. Given the
> previousGrouperClass and newGrouperClass, KafkaCheckpointLogKey will throw
> exception if and only if newGrouperClass is an instance of
> previousGrouperClass.
> GroupBySystemStreamPartitionWithFixedTaskNum should extend
> GroupBySystemStreamPartition
> and GroupByPartitionWithFixedTaskNum should extend GroupByPartition. Does
> this address your concern?

Sounds workable, thanks.

>
> Can
> you be more specific why Partition-to-task mapping is not meaningful
> without
> some definition of the key-to-partition assignments and why it is
> incomplete and misleading?

 A partition is (in my naive interpretation) an independent queue for
messages of a particular key set. It is not the *identity* of the partition
that determine the contents of the associated task's local state. Rather it
is the *contents* of the partition that affect the task's state. A
partiton-to-task mapping only captures an identity relationship:
partition1->task1. Without the assumptions of this SEP, this is
insufficient to determine the assignment of keys to tasks, which is what
really matters. Therefore, any future feature that utilizes this mapping
without accounting for the assumptions of this SEP is likely to malfunction.


On Mon, Jun 12, 2017 at 5:09 PM, Dong Lin  wrote:

> Hey Jacob,
>
> Thanks for the explanation. It seems that your biggest concern is with the
> generality of the proposal. Let me try to address this and other comments
> below.
>
> 1) ... it will cause headaches for Samza users ...
>
> I am not sure I understand why this proposal causes headache for Samza
> users. Here is the impact of the SEP-5 on users:
>
> - For users that do not need partition expansion of the input stream, they
> can use Samza without change change in code/binary/config. Thus there is no
> headache for them.
>
> - For users that need partition expansion of the input streams for
> stateless job, they currently need to manually reboot their Samza job in
> order to let Samza consume the new partitions created for the stream. SEP-5
> actually reduced their headache by allowing Samza to automatically detect
> and consume new partitions.
>
> - For users that need partition expansion of the input streams for stateful
> job, they have a really big headache in the sense that Samza does not allow
> partition expansion for stateful job. SEP-5 addresses this headache for
> them.
>
> You are right that SEP-5 requires user to understand and enforce
> limitations across organizations. But it is still much better than not
> allowing user to expansion partition for stateful jobs at all, right? Did I
> miss something here?
>
> 2) ... Separate orgs are often difficult to coordinate and a system which
> depends on such significant process/coordination is too fragile for my
> taste ..
>
> This is true. Ideally we want a system that is fully self-serving. I think
> this is a long term goal for Samza. Still, for the reasons described above,
> I think something is better than nothing. I am open to alternative design
> that can support partition expansion for stateful jobs without requiring
> coordination.
>
> 3) There is currently no supported way of sharing state among the tasks of
> a container.  Each task has its own isolated store and that logical
> isolation is the primary thing that enables Samza jobs to scale with a
> simple conta

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-12 Thread Dong Lin
Thanks Xinyu offering a solution.

Yeah, we have actually listed it as the third rejected alternative in SEP-5
.
I can move this to future work. I think it is actually a great idea to
support more general partition expansion and I think this is what we should
do for Samza in the long term.

While this pluggable function enables support for more input system, it
alone won't address Jacob's concern with separate coordination across
organization. This is because user would still need to coordinate with the
upstream organization and manually configure their Samza job to specify an
old-to-new-partition mapping to be consistent with the mapping enforced in
the input system. We can make this self-serving if the input system
provides an interface for Samza to dynamically fetch the
new-to-old-partition mapping. But it needs to be future work since no
existing input system provides this interface.

On Mon, Jun 12, 2017 at 4:54 PM, xinyu liu  wrote:

> How about making the partition mapping function a pluggable component in
> the partition expansion? Mathematically, this is a mapping function which
> is able to map the new partitions to the old ones:
>
>   *f (new partition) -> old partition*
>
> If the function is a surjective function (
> https://en.wikipedia.org/wiki/Surjective_function), we are able to keep
> the
> tasks as they were by replacing the old partition assignment with the new
> one using the mapping function. By making this function pluggable, users
> can provide their own mapping functions to make this work for different
> kinds of input systems. Samza should check whether the function is
> surjective so it knows whether we can keep the same task count. with
> different grouping. For Kafka, we can provide a simple modular function as
> the mapping, and it's surjective. I agree it's very nice to have a more
> general support to be able to split the states of tasks and expand the
> change log etc, but this SEP is still useful and can address quite a large
> number of scenarios in practice, do you agree?
>
> Thanks,
> Xinyu
>
> On Mon, Jun 12, 2017 at 3:54 PM, Jacob Maes  wrote:
>
> > Hey Dong,
> >
> > I'm opposed (or a +0, at best) to this limited, Kafka-specific solution.
> I
> > understand that the proposal is relatively simple to implement, but I
> think
> > it will cause headaches for Samza users. They will not only have to
> > understand all the limitations (increase only, double partitions only,
> > partition using hash+modulo, etc) of this approach, but enforcing these
> > limitations can be a major problem, especially when the Samza jobs and
> > message brokers are managed by separate orgs in a company. Separate orgs
> > are often difficult to coordinate and a system which depends on such
> > significant process/coordination is too fragile for my taste.
> >
> > That said, I realize that my opinion is just one of many in the broader
> > community which may feel differently, so let me respond to some of the
> > other items in the discussion so we can clear them up:
> >
> > The task-to-container assignment matters because if the correlated tasks
> > > (i.e. tasks that consume messages with the same key) needs to be in the
> > > same container so that they can share the same key/value local store on
> > the
> > > same physical machine.
> >
> > There is currently no supported way of sharing state among the tasks of a
> > container.  Each task has its own isolated store and that logical
> isolation
> > is the primary thing that enables Samza jobs to scale with a simple
> > container count change. My feeling is that we should not change this
> > without good reason.
> >
> > I think we can hardcode new logic in KafkaCheckpointLogKey.scala such
> that
> > > exception will not be thrown if new grouper is
> > > GroupByPartitionWithFixedTaskNum and old grouper is GroupByPartition.
> > Does
> > > this look reasonable?
> >
> > With the current proposal, we'd also need a similar check for
> > GroupBySystemStreamPartitionWithFixedTaskNum as well. And if any other
> > groupers were later added with both these modes, we'd probably need to
> add
> > those too. It might be easier and cleaner to add a config to ignore that
> > check temporarily. Down side is that it further complicates the Samza
> > config, which is already huge. Thoughts?
> >
> > I think storing the previous task-to-partition mapping is more general
> than
> > > storing the partition count of all topics for the following reasons:
> > > - Samza already stores the task-to-container mapping and
> > container-to-host
> > > mapping in the coordinator stream. It seems consistent to also store
> the
> > > partition-to-task mapping. And this information may be useful for other
> > > use-case such as debugging.
> > > - By having the new interface take the previous task-to-partition
> > > assignment instead of a topic-to-partition-count mapping as new
> > p

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-12 Thread Dong Lin
Hey Jacob,

Thanks for the explanation. It seems that your biggest concern is with the
generality of the proposal. Let me try to address this and other comments
below.

1) ... it will cause headaches for Samza users ...

I am not sure I understand why this proposal causes headache for Samza
users. Here is the impact of the SEP-5 on users:

- For users that do not need partition expansion of the input stream, they
can use Samza without change change in code/binary/config. Thus there is no
headache for them.

- For users that need partition expansion of the input streams for
stateless job, they currently need to manually reboot their Samza job in
order to let Samza consume the new partitions created for the stream. SEP-5
actually reduced their headache by allowing Samza to automatically detect
and consume new partitions.

- For users that need partition expansion of the input streams for stateful
job, they have a really big headache in the sense that Samza does not allow
partition expansion for stateful job. SEP-5 addresses this headache for
them.

You are right that SEP-5 requires user to understand and enforce
limitations across organizations. But it is still much better than not
allowing user to expansion partition for stateful jobs at all, right? Did I
miss something here?

2) ... Separate orgs are often difficult to coordinate and a system which
depends on such significant process/coordination is too fragile for my
taste ..

This is true. Ideally we want a system that is fully self-serving. I think
this is a long term goal for Samza. Still, for the reasons described above,
I think something is better than nothing. I am open to alternative design
that can support partition expansion for stateful jobs without requiring
coordination.

3) There is currently no supported way of sharing state among the tasks of
a container.  Each task has its own isolated store and that logical
isolation is the primary thing that enables Samza jobs to scale with a
simple container count change. My feeling is that we should not change
this without
good reason.

I see your point. I will remove this sentence from the motivation section.
This won't have any impact on the design of the SEP-5. Does this address
the problem?

4) With the current proposal, we'd also need a similar check for
GroupBySystemStreamPartitionWithFixedTaskNum as well. And if any other groupers
were later added with both these modes, we'd probably need to add those
too. It might be easier and cleaner to add a config to ignore that check
temporarily. Down side is that it further complicates the Samza config,
which is already huge. Thoughts?

Yes, we need a similar check for GroupBySystemStreamPartitionWithFixedTaskNum
as well. If there is more grouper classes needed in the future, we can
solve this problem cleanly without new config. Given the
previousGrouperClass and newGrouperClass, KafkaCheckpointLogKey will throw
exception if and only if newGrouperClass is an instance of
previousGrouperClass.
GroupBySystemStreamPartitionWithFixedTaskNum should extend
GroupBySystemStreamPartition
and GroupByPartitionWithFixedTaskNum should extend GroupByPartition. Does
this address your concern?

5) The task-to-container and container-to-host mappings are both meaningful
in context of the JobModel. Partition-to-task mapping is not meaningful without
some definition of the key-to-partition assignments. It's incomplete
information and therefore misleading. I think it only makes sense to use
this mapping if we adopt a solution wherein Samza also knows the partition
key assignment.

Partition-to-task is currently explicitly passed from job coordinator to
each task as part of the job model to tell tasks which partitions to
consume from. I think we can store some definition of the key-to-partition
assignments if Samza decides to get and use this information in the future. Can
you be more specific why Partition-to-task mapping is not meaningful without
some definition of the key-to-partition assignments and why it is
incomplete and misleading?


Thanks,
Dong

On Mon, Jun 12, 2017 at 3:54 PM, Jacob Maes  wrote:

> Hey Dong,
>
> I'm opposed (or a +0, at best) to this limited, Kafka-specific solution. I
> understand that the proposal is relatively simple to implement, but I think
> it will cause headaches for Samza users. They will not only have to
> understand all the limitations (increase only, double partitions only,
> partition using hash+modulo, etc) of this approach, but enforcing these
> limitations can be a major problem, especially when the Samza jobs and
> message brokers are managed by separate orgs in a company. Separate orgs
> are often difficult to coordinate and a system which depends on such
> significant process/coordination is too fragile for my taste.
>
> That said, I realize that my opinion is just one of many in the broader
> community which may feel differently, so let me respond to some of the
> other items in the discussion so we can clear them up:
>
> The tas

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-12 Thread xinyu liu
How about making the partition mapping function a pluggable component in
the partition expansion? Mathematically, this is a mapping function which
is able to map the new partitions to the old ones:

  *f (new partition) -> old partition*

If the function is a surjective function (
https://en.wikipedia.org/wiki/Surjective_function), we are able to keep the
tasks as they were by replacing the old partition assignment with the new
one using the mapping function. By making this function pluggable, users
can provide their own mapping functions to make this work for different
kinds of input systems. Samza should check whether the function is
surjective so it knows whether we can keep the same task count. with
different grouping. For Kafka, we can provide a simple modular function as
the mapping, and it's surjective. I agree it's very nice to have a more
general support to be able to split the states of tasks and expand the
change log etc, but this SEP is still useful and can address quite a large
number of scenarios in practice, do you agree?

Thanks,
Xinyu

On Mon, Jun 12, 2017 at 3:54 PM, Jacob Maes  wrote:

> Hey Dong,
>
> I'm opposed (or a +0, at best) to this limited, Kafka-specific solution. I
> understand that the proposal is relatively simple to implement, but I think
> it will cause headaches for Samza users. They will not only have to
> understand all the limitations (increase only, double partitions only,
> partition using hash+modulo, etc) of this approach, but enforcing these
> limitations can be a major problem, especially when the Samza jobs and
> message brokers are managed by separate orgs in a company. Separate orgs
> are often difficult to coordinate and a system which depends on such
> significant process/coordination is too fragile for my taste.
>
> That said, I realize that my opinion is just one of many in the broader
> community which may feel differently, so let me respond to some of the
> other items in the discussion so we can clear them up:
>
> The task-to-container assignment matters because if the correlated tasks
> > (i.e. tasks that consume messages with the same key) needs to be in the
> > same container so that they can share the same key/value local store on
> the
> > same physical machine.
>
> There is currently no supported way of sharing state among the tasks of a
> container.  Each task has its own isolated store and that logical isolation
> is the primary thing that enables Samza jobs to scale with a simple
> container count change. My feeling is that we should not change this
> without good reason.
>
> I think we can hardcode new logic in KafkaCheckpointLogKey.scala such that
> > exception will not be thrown if new grouper is
> > GroupByPartitionWithFixedTaskNum and old grouper is GroupByPartition.
> Does
> > this look reasonable?
>
> With the current proposal, we'd also need a similar check for
> GroupBySystemStreamPartitionWithFixedTaskNum as well. And if any other
> groupers were later added with both these modes, we'd probably need to add
> those too. It might be easier and cleaner to add a config to ignore that
> check temporarily. Down side is that it further complicates the Samza
> config, which is already huge. Thoughts?
>
> I think storing the previous task-to-partition mapping is more general than
> > storing the partition count of all topics for the following reasons:
> > - Samza already stores the task-to-container mapping and
> container-to-host
> > mapping in the coordinator stream. It seems consistent to also store the
> > partition-to-task mapping. And this information may be useful for other
> > use-case such as debugging.
> > - By having the new interface take the previous task-to-partition
> > assignment instead of a topic-to-partition-count mapping as new
> parameter,
> > we can potentially have grouper implementation to support other types of
> > input systems.
> > - It is sightly simpler to store the task-to-partition assignment because
> > we don't need to know whether this is the first time a job is started or
> > not. On the other hand, you can write topic-to-partition-count mapping to
> > the coordinator stream only if this is the first time the job is run
>
> The task-to-container and container-to-host mappings are both meaningful in
> context of the JobModel. Partition-to-task mapping is not meaningful
> without some definition of the key-to-partition assignments. It's
> incomplete information and therefore misleading. I think it only makes
> sense to use this mapping if we adopt a solution wherein Samza also knows
> the partition key assignment.
>
> -Jake
>
> On Tue, Jun 6, 2017 at 11:06 PM, Dong Lin  wrote:
>
> > Hey Jacob,
> >
> > Thanks for taking time to review the SEP.
> >
> > I agree with you and Navina that the current SEP doesn't provide support
> to
> > arbitrary input systems and it doesn't support partition shrink. I think
> > the scope of this SEP is to support partition expansion for Kafka (the
> most
> > widely used input system 

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-12 Thread Jacob Maes
Hey Dong,

I'm opposed (or a +0, at best) to this limited, Kafka-specific solution. I
understand that the proposal is relatively simple to implement, but I think
it will cause headaches for Samza users. They will not only have to
understand all the limitations (increase only, double partitions only,
partition using hash+modulo, etc) of this approach, but enforcing these
limitations can be a major problem, especially when the Samza jobs and
message brokers are managed by separate orgs in a company. Separate orgs
are often difficult to coordinate and a system which depends on such
significant process/coordination is too fragile for my taste.

That said, I realize that my opinion is just one of many in the broader
community which may feel differently, so let me respond to some of the
other items in the discussion so we can clear them up:

The task-to-container assignment matters because if the correlated tasks
> (i.e. tasks that consume messages with the same key) needs to be in the
> same container so that they can share the same key/value local store on the
> same physical machine.

There is currently no supported way of sharing state among the tasks of a
container.  Each task has its own isolated store and that logical isolation
is the primary thing that enables Samza jobs to scale with a simple
container count change. My feeling is that we should not change this
without good reason.

I think we can hardcode new logic in KafkaCheckpointLogKey.scala such that
> exception will not be thrown if new grouper is
> GroupByPartitionWithFixedTaskNum and old grouper is GroupByPartition. Does
> this look reasonable?

With the current proposal, we'd also need a similar check for
GroupBySystemStreamPartitionWithFixedTaskNum as well. And if any other
groupers were later added with both these modes, we'd probably need to add
those too. It might be easier and cleaner to add a config to ignore that
check temporarily. Down side is that it further complicates the Samza
config, which is already huge. Thoughts?

I think storing the previous task-to-partition mapping is more general than
> storing the partition count of all topics for the following reasons:
> - Samza already stores the task-to-container mapping and container-to-host
> mapping in the coordinator stream. It seems consistent to also store the
> partition-to-task mapping. And this information may be useful for other
> use-case such as debugging.
> - By having the new interface take the previous task-to-partition
> assignment instead of a topic-to-partition-count mapping as new parameter,
> we can potentially have grouper implementation to support other types of
> input systems.
> - It is sightly simpler to store the task-to-partition assignment because
> we don't need to know whether this is the first time a job is started or
> not. On the other hand, you can write topic-to-partition-count mapping to
> the coordinator stream only if this is the first time the job is run

The task-to-container and container-to-host mappings are both meaningful in
context of the JobModel. Partition-to-task mapping is not meaningful
without some definition of the key-to-partition assignments. It's
incomplete information and therefore misleading. I think it only makes
sense to use this mapping if we adopt a solution wherein Samza also knows
the partition key assignment.

-Jake

On Tue, Jun 6, 2017 at 11:06 PM, Dong Lin  wrote:

> Hey Jacob,
>
> Thanks for taking time to review the SEP.
>
> I agree with you and Navina that the current SEP doesn't provide support to
> arbitrary input systems and it doesn't support partition shrink. I think
> the scope of this SEP is to support partition expansion for Kafka (the most
> widely used input system of Samza) and keep the door open for partition
> support of various input systems. The current design can support any system
> that meets the two operational requirement specified in the doc.
>
> While it is possible to support more types of input systems, it will likely
> add more complexity to the design. For example, the first alternative
> solution from you requires broker-side support to negotiate hash algorithm.
> The second alternative solution requires changelog partition reshuffle
> which carries its own design complexity and performance overhead. There is
> tradeoff between the generality and the complexity among these choices. I
> like the current design because it is simple and addresses a big usage
> scenario for us. We can add more complexity to generalize the design if it
> enables important use-case. Does this sound reasonable?
>
> Note that the "Rejected Alternative" section also mentions the possibility
> of supporting a wider range of input systems by allowing user to specify
> the new-partition to old-partition mapping. We are not doing it because 1)
> we may have better understanding of the design after we have a specific
> second input system to support 2) the current design can be extended to
> support general input systems. I

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-07 Thread Dong Lin
Hey Jacob, Navina, Yi,

I am wondering if my answer has addressed your concern. Can you let me know
if there is any concern with SEP?

Thanks,
Dong

On Tue, Jun 6, 2017 at 11:06 PM, Dong Lin  wrote:

> Hey Jacob,
>
> Thanks for taking time to review the SEP.
>
> I agree with you and Navina that the current SEP doesn't provide support
> to arbitrary input systems and it doesn't support partition shrink. I think
> the scope of this SEP is to support partition expansion for Kafka (the most
> widely used input system of Samza) and keep the door open for partition
> support of various input systems. The current design can support any system
> that meets the two operational requirement specified in the doc.
>
> While it is possible to support more types of input systems, it will
> likely add more complexity to the design. For example, the first
> alternative solution from you requires broker-side support to negotiate
> hash algorithm. The second alternative solution requires changelog
> partition reshuffle which carries its own design complexity and performance
> overhead. There is tradeoff between the generality and the complexity among
> these choices. I like the current design because it is simple and addresses
> a big usage scenario for us. We can add more complexity to generalize the
> design if it enables important use-case. Does this sound reasonable?
>
> Note that the "Rejected Alternative" section also mentions the possibility
> of supporting a wider range of input systems by allowing user to specify
> the new-partition to old-partition mapping. We are not doing it because 1)
> we may have better understanding of the design after we have a specific
> second input system to support 2) the current design can be extended to
> support general input systems. I think similar argument can be applied
> explain why we don't have to support general input systems using the
> potentially-good alternatives you mentioned.
>
> I hope SEP-5 can be an important first-step towards supporting partition
> expansion of any input system.
>
> To answer your questions about the current proposal:
>
> >1. "An alternative solution is to allow task number to increase after
> >partition expansion and uses a proper task-to-container assignment to
> make
> >sure the Samza output is correct." What does the container have to do
> with
> >stateful processing or output in general?
>
> The task-to-container assignment matters because if the correlated tasks
> (i.e. tasks that consume messages with the same key) needs to be in the
> same container so that they can share the same key/value local store on the
> same physical machine.
>
> >2. When you use "Join" as an example, you basically mean multiple
> >co-partitioned streams, right? This is opposed to multiple,
> >independently-partitioned streams or a single stream. Would be nice to
> >formulate the proposal in these more general terms.
>
> I thought "join" is a commonly used to refer to the join opeartion with
> co-partitioned stream but I may be wrong. I have updated the wiki to
> explicitly mention "co-partitioned stream". Does this look better now?
>
> >3. When switching SSP groupers, how will the users avoid the
> >org.apache.samza.checkpoint.kafka.DifferingSystemStreamPartition
> GrouperFactoryValues
> >exception?
>
> I think we can hardcode new logic in KafkaCheckpointLogKey.scala such that
> exception will not be thrown if new grouper is
> GroupByPartitionWithFixedTaskNum and old grouper is GroupByPartition.
> Does this look reasonable?
>
> >4. Partition to task assignment is meaningless without key to partition
> >mapping. The real semantics are captured in the external requirement for
> >partitioning via hash+modulo. But in that case, iiuc, only the partition
> >count matters. So why not just store the original partition count rather
> >than the whole mapping?
>
> I think storing the previous task-to-partition mapping is more general
> than storing the partition count of all topics for the following reasons:
>
> - Samza already stores the task-to-container mapping and container-to-host
> mapping in the coordinator stream. It seems consistent to also store the
> partition-to-task mapping. And this information may be useful for other
> use-case such as debugging.
>
> - By having the new interface take the previous task-to-partition
> assignment instead of a topic-to-partition-count mapping as new parameter,
> we can potentially have grouper implementation to support other types of
> input systems.
>
> - It is sightly simpler to store the task-to-partition assignment because
> we don't need to know whether this is the first time a job is started or
> not. On the other hand, you can write topic-to-partition-count mapping to
> the coordinator stream only if this is the first time the job is run
>
> Thanks,
> Dong
>
> On Mon, Jun 5, 2017 at 11:17 AM, Jacob Maes  wrote:
>
>> Hey Dong,
>>
>> Thanks for the SEP. Supporting partition changes is critically important
>> for stateful Samza

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-06 Thread Dong Lin
Hey Jacob,

Thanks for taking time to review the SEP.

I agree with you and Navina that the current SEP doesn't provide support to
arbitrary input systems and it doesn't support partition shrink. I think
the scope of this SEP is to support partition expansion for Kafka (the most
widely used input system of Samza) and keep the door open for partition
support of various input systems. The current design can support any system
that meets the two operational requirement specified in the doc.

While it is possible to support more types of input systems, it will likely
add more complexity to the design. For example, the first alternative
solution from you requires broker-side support to negotiate hash algorithm.
The second alternative solution requires changelog partition reshuffle
which carries its own design complexity and performance overhead. There is
tradeoff between the generality and the complexity among these choices. I
like the current design because it is simple and addresses a big usage
scenario for us. We can add more complexity to generalize the design if it
enables important use-case. Does this sound reasonable?

Note that the "Rejected Alternative" section also mentions the possibility
of supporting a wider range of input systems by allowing user to specify
the new-partition to old-partition mapping. We are not doing it because 1)
we may have better understanding of the design after we have a specific
second input system to support 2) the current design can be extended to
support general input systems. I think similar argument can be applied
explain why we don't have to support general input systems using the
potentially-good alternatives you mentioned.

I hope SEP-5 can be an important first-step towards supporting partition
expansion of any input system.

To answer your questions about the current proposal:

>1. "An alternative solution is to allow task number to increase after
>partition expansion and uses a proper task-to-container assignment to make
>sure the Samza output is correct." What does the container have to do with
>stateful processing or output in general?

The task-to-container assignment matters because if the correlated tasks
(i.e. tasks that consume messages with the same key) needs to be in the
same container so that they can share the same key/value local store on the
same physical machine.

>2. When you use "Join" as an example, you basically mean multiple
>co-partitioned streams, right? This is opposed to multiple,
>independently-partitioned streams or a single stream. Would be nice to
>formulate the proposal in these more general terms.

I thought "join" is a commonly used to refer to the join opeartion with
co-partitioned stream but I may be wrong. I have updated the wiki to
explicitly mention "co-partitioned stream". Does this look better now?

>3. When switching SSP groupers, how will the users avoid the
>org.apache.samza.checkpoint.kafka.DifferingSystemStreamPartition
GrouperFactoryValues
>exception?

I think we can hardcode new logic in KafkaCheckpointLogKey.scala such that
exception will not be thrown if new grouper is
GroupByPartitionWithFixedTaskNum and old grouper is GroupByPartition. Does
this look reasonable?

>4. Partition to task assignment is meaningless without key to partition
>mapping. The real semantics are captured in the external requirement for
>partitioning via hash+modulo. But in that case, iiuc, only the partition
>count matters. So why not just store the original partition count rather
>than the whole mapping?

I think storing the previous task-to-partition mapping is more general than
storing the partition count of all topics for the following reasons:

- Samza already stores the task-to-container mapping and container-to-host
mapping in the coordinator stream. It seems consistent to also store the
partition-to-task mapping. And this information may be useful for other
use-case such as debugging.

- By having the new interface take the previous task-to-partition
assignment instead of a topic-to-partition-count mapping as new parameter,
we can potentially have grouper implementation to support other types of
input systems.

- It is sightly simpler to store the task-to-partition assignment because
we don't need to know whether this is the first time a job is started or
not. On the other hand, you can write topic-to-partition-count mapping to
the coordinator stream only if this is the first time the job is run

Thanks,
Dong

On Mon, Jun 5, 2017 at 11:17 AM, Jacob Maes  wrote:

> Hey Dong,
>
> Thanks for the SEP. Supporting partition changes is critically important
> for stateful Samza jobs, so it's great to see some ideas on that front!
>
> Sorry for the late feedback, but I have a few thoughts to contribute.
>
> Big +1 on Navina's comment:
>
> > My biggest gripe with this SEP is that it seems like a tailor-made
> > solution
> > that relies on the semantics of the Kafka system and yet, we are trying
> to
> > masquerade that as operational requirements

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-05 Thread Jacob Maes
Hey Dong,

Thanks for the SEP. Supporting partition changes is critically important
for stateful Samza jobs, so it's great to see some ideas on that front!

Sorry for the late feedback, but I have a few thoughts to contribute.

Big +1 on Navina's comment:

> My biggest gripe with this SEP is that it seems like a tailor-made
> solution
> that relies on the semantics of the Kafka system and yet, we are trying to
> masquerade that as operational requirements for other systems interacting
> with Samza. (Not to say that this is the first time such a choice is being
> made in the Samza design). I am not seeing how this can a "general"
> solution for all input systems. That's my two cents. I would like to hear
> alternative points of view for this from other devs.


Two examples of this:
1. This is mostly a hypothetical, but some message brokers may use key
range assignment rather than hash+modulo.
2. Kafka can't reduce the number of partitions, but it can happen on other
systems. For example, it may be cheaper to reduce the number of partitions
on a hosted service where the cost model depends on the number of
partitions/shards.

It seems to me that a solution which doesn't depend on partition key
assignment in the message broker. Here are a few alternatives that weren't
discussed and I think should be considered:

Alternatives in order of increasing preference:
1. Samza manages the partition hash (via some new contract with the
brokers) and guarantees correct routing of keys among the new partitions.
2. Samza detects a task count change, creates a new changelog with correct
partitions, and *somehow* reshuffles all existing changelog data into the
new topic and then uses the new topic from then on. (doesn't work without
changelog, but in that case durability isn't paramount, so we can just wipe)
3. Use RPC in between stages and samza fully manages key assignment among
tasks. No on-disk topic data to clean up. Mandatory repartitioning in the
first stage to pre-scaled tasks in next stage.
4. Combined 2-3 solution

Finally, some questions about the current proposal:
1. "An alternative solution is to allow task number to increase after
partition expansion and uses a proper task-to-container assignment to make
sure the Samza output is correct." What does the container have to do with
stateful processing or output in general?
2. When you use "Join" as an example, you basically mean multiple
co-partitioned streams, right? This is opposed to multiple,
independently-partitioned streams or a single stream. Would be nice to
formulate the proposal in these more general terms.
3. When switching SSP groupers, how will the users avoid the
org.apache.samza.checkpoint.kafka.DifferingSystemStreamPartitionGrouperFactoryValues
exception?
4. Partition to task assignment is meaningless without key to partition
mapping. The real semantics are captured in the external requirement for
partitioning via hash+modulo. But in that case, iiuc, only the partition
count matters. So why not just store the original partition count rather
than the whole mapping?

-Jake

On Sun, Jun 4, 2017 at 12:32 PM, Dong Lin  wrote:

> Hey Yi, Navina,
>
> I have updated the SEP-5 document based on our discussion. The difference
> can be found here
>  pageId=70255476&selectedPageVersions=14&selectedPageVersions=15>.
> Here is the summary of changes:
>
> - Add new interface that extends the existing interface
> SystemStreamPartitionGrouper. Newly-added grouper class should implement
> this interface.
> - Explained in the Rejected Alternative Section why we don't add new method
> in the existing interface
> - Explained in the Rejected Alternative Section why we don't config/class
> for user to specify new-partition to old-partition mapping.
>
> Can you take another look at the proposal and let me know if there is any
> concern?
>
> Cheers,
> Dong
>
>
> On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin  wrote:
>
> > Hey Yi,
> >
> > Thanks much for the comment. I have updated the doc to address all your
> > comments except the one related to the interface. I am not sure I
> > understand your suggestion of the new interface. Will discuss tomorrow.
> >
> > Thanks,
> > Dong
> >
> > On Wed, May 31, 2017 at 4:29 PM, Yi Pan  wrote:
> >
> >> Hi, Don,
> >>
> >> Thanks for the detailed design doc for a long-waited feature in Samza!
> >> Really appreciate it! I did a quick pass and have the following
> comments:
> >>
> >> - minor: "limit the maximum size of partition" ==> "limit the maximum
> size
> >> of each partition"
> >> - "However, Samza currently is not able to handle partition expansion of
> >> the input streams"==>better point out "for stateful jobs". For stateless
> >> jobs, simply bouncing the job now can pick up the new partitions.
> >> - "it is possible (e.g. with Kafka) that messages with a given key
> exists
> >> in both partition 1 an 3. Because GroupByPartition will assign
> partition 1
> >> and 3 

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-04 Thread Dong Lin
Hey Yi, Navina,

I have updated the SEP-5 document based on our discussion. The difference
can be found here
.
Here is the summary of changes:

- Add new interface that extends the existing interface
SystemStreamPartitionGrouper. Newly-added grouper class should implement
this interface.
- Explained in the Rejected Alternative Section why we don't add new method
in the existing interface
- Explained in the Rejected Alternative Section why we don't config/class
for user to specify new-partition to old-partition mapping.

Can you take another look at the proposal and let me know if there is any
concern?

Cheers,
Dong


On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin  wrote:

> Hey Yi,
>
> Thanks much for the comment. I have updated the doc to address all your
> comments except the one related to the interface. I am not sure I
> understand your suggestion of the new interface. Will discuss tomorrow.
>
> Thanks,
> Dong
>
> On Wed, May 31, 2017 at 4:29 PM, Yi Pan  wrote:
>
>> Hi, Don,
>>
>> Thanks for the detailed design doc for a long-waited feature in Samza!
>> Really appreciate it! I did a quick pass and have the following comments:
>>
>> - minor: "limit the maximum size of partition" ==> "limit the maximum size
>> of each partition"
>> - "However, Samza currently is not able to handle partition expansion of
>> the input streams"==>better point out "for stateful jobs". For stateless
>> jobs, simply bouncing the job now can pick up the new partitions.
>> - "it is possible (e.g. with Kafka) that messages with a given key exists
>> in both partition 1 an 3. Because GroupByPartition will assign partition 1
>> and 3 to different tasks, messages with the same key may be handled by
>> different task/container/process and their state will be stored in
>> different changelog partition." The problem statement is not super clear
>> here. The issues with stateful jobs is: after GroupByPartition assign
>> partition 1 and 3 to different tasks, the new task handling partition 3
>> does not have the previous state to resume the work. e.g. a page-key based
>> counter would start from 0 in the new task for a specific key, instead of
>> resuming the previous count 50 held by task 1.
>> - minor rewording: "the first solution in this doc" ==> "the solution
>> proposed in this doc"
>> - "Thus additional development work is needed in Kafka to meet this
>> requirement" It would be good to link to a KIP if and when it exists
>> - Instead of touching/deprecating the interface
>> SystemStreamPartitionGrouper, I would recommend to have a different
>> implementation class of the interface, which in the constructor of the
>> grouper, takes two parameters: a) the previous task number read from the
>> coordinator stream; b) the configured new-partition to old-partition
>> mapping policy. Then, the grouper's interface method stays the same and
>> the
>> behavior of the grouper is more configurable which is good to support a
>> broader set of use cases in addition to Kafka's built-in partition
>> expansion policies.
>> - Minor renaming suggestion to the new grouper class names:
>> GroupByPartitionWithFixedTaskNum
>> and GroupBySystemStreamPartitionWithFixedTaskNum
>>
>> Thanks!
>>
>> - Yi
>>
>> On Wed, May 31, 2017 at 10:33 AM, Dong Lin  wrote:
>>
>> > Hey Navina,
>> >
>> > Thanks much for the comment. Please see my response below.
>> >
>> > Regarding your biggest gripe with the SEP, I personally think the
>> > operational requirement proposed in the KIP are pretty general and
>> could be
>> > easily enforced by other systems. The reason is that the module
>> operation
>> > is pretty standard and the default option when we choose partition. And
>> > usually the underlying system allows user to select arbitrary partition
>> > number if it supports partition expansion. Do you know any system that
>> does
>> > not meet these two requirement?
>> >
>> > Regarding your comment of the Motivation section, I renamed the first
>> > section as "Problem and Goal" and specified that "*The goal of this
>> > proposal is to enable partition expansion of the input streams*.". I
>> also
>> > put a sentence at the end of the Motivation section that "*The feature
>> of
>> > task expansion is out of the scope of this proposal and will be
>> addressed
>> > in a future SEP*". The second paragraph in the Motivation section is
>> mainly
>> > used to explain the thinking process that we have gone through, what
>> other
>> > alternative we have considered, and we plan to do in Samza in the nex
>> step.
>> >
>> > To answer your question why increasing the partition number will
>> increase
>> > the throughput of the kafka consumer in the container, Kafka consumer
>> can
>> > potentially fetch more data in one FetchResponse with more partitions in
>> > the FetchRequest. This is because we limit the maximum amount of data
>> that
>> > can be fetch

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-01 Thread Dong Lin
Hey Yi,

Thanks much for the comment. I have updated the doc to address all your
comments except the one related to the interface. I am not sure I
understand your suggestion of the new interface. Will discuss tomorrow.

Thanks,
Dong

On Wed, May 31, 2017 at 4:29 PM, Yi Pan  wrote:

> Hi, Don,
>
> Thanks for the detailed design doc for a long-waited feature in Samza!
> Really appreciate it! I did a quick pass and have the following comments:
>
> - minor: "limit the maximum size of partition" ==> "limit the maximum size
> of each partition"
> - "However, Samza currently is not able to handle partition expansion of
> the input streams"==>better point out "for stateful jobs". For stateless
> jobs, simply bouncing the job now can pick up the new partitions.
> - "it is possible (e.g. with Kafka) that messages with a given key exists
> in both partition 1 an 3. Because GroupByPartition will assign partition 1
> and 3 to different tasks, messages with the same key may be handled by
> different task/container/process and their state will be stored in
> different changelog partition." The problem statement is not super clear
> here. The issues with stateful jobs is: after GroupByPartition assign
> partition 1 and 3 to different tasks, the new task handling partition 3
> does not have the previous state to resume the work. e.g. a page-key based
> counter would start from 0 in the new task for a specific key, instead of
> resuming the previous count 50 held by task 1.
> - minor rewording: "the first solution in this doc" ==> "the solution
> proposed in this doc"
> - "Thus additional development work is needed in Kafka to meet this
> requirement" It would be good to link to a KIP if and when it exists
> - Instead of touching/deprecating the interface
> SystemStreamPartitionGrouper, I would recommend to have a different
> implementation class of the interface, which in the constructor of the
> grouper, takes two parameters: a) the previous task number read from the
> coordinator stream; b) the configured new-partition to old-partition
> mapping policy. Then, the grouper's interface method stays the same and the
> behavior of the grouper is more configurable which is good to support a
> broader set of use cases in addition to Kafka's built-in partition
> expansion policies.
> - Minor renaming suggestion to the new grouper class names:
> GroupByPartitionWithFixedTaskNum
> and GroupBySystemStreamPartitionWithFixedTaskNum
>
> Thanks!
>
> - Yi
>
> On Wed, May 31, 2017 at 10:33 AM, Dong Lin  wrote:
>
> > Hey Navina,
> >
> > Thanks much for the comment. Please see my response below.
> >
> > Regarding your biggest gripe with the SEP, I personally think the
> > operational requirement proposed in the KIP are pretty general and could
> be
> > easily enforced by other systems. The reason is that the module operation
> > is pretty standard and the default option when we choose partition. And
> > usually the underlying system allows user to select arbitrary partition
> > number if it supports partition expansion. Do you know any system that
> does
> > not meet these two requirement?
> >
> > Regarding your comment of the Motivation section, I renamed the first
> > section as "Problem and Goal" and specified that "*The goal of this
> > proposal is to enable partition expansion of the input streams*.". I also
> > put a sentence at the end of the Motivation section that "*The feature of
> > task expansion is out of the scope of this proposal and will be addressed
> > in a future SEP*". The second paragraph in the Motivation section is
> mainly
> > used to explain the thinking process that we have gone through, what
> other
> > alternative we have considered, and we plan to do in Samza in the nex
> step.
> >
> > To answer your question why increasing the partition number will increase
> > the throughput of the kafka consumer in the container, Kafka consumer can
> > potentially fetch more data in one FetchResponse with more partitions in
> > the FetchRequest. This is because we limit the maximum amount of data
> that
> > can be fetch for a given partition in the FetchResponse. This by default
> is
> > set to 1 MB. And there is reason that we can not arbitrarily bump up this
> > limit.
> >
> > To answer your question how partition expansion in Kafka impacts the
> > clients, Kafka consumer is able to automatically detect new partition of
> > the topic and reassign all (both old and new) partitions across consumers
> > in the consumer group IF you tell consumer the topic to be subscribed.
> But
> > consumer in Samza's container uses another way of subscription. Instead
> of
> > subscribing to the topic, the consumer in Samza's container subscribes to
> > the specific partitions of the topic. In this case, if new partitions
> have
> > been added, Samza will need to explicitly subscribe to the new partitions
> > of the topic. The "Handle partition expansion while tasks are running"
> > section in the SEP addresses this issue in Samza -- it rec

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-06-01 Thread Dong Lin
Hey Navina,

My point is that, suppose the underlying system allows user to select
arbitrary partition number during partition expansion, which I assume is
applicable to all input systems that Samza will use, then we can easily
enforce the rule that expansion of partitions should always happen by
doubling the partition count. Does this make sense?

Thanks,
Dong

On Wed, May 31, 2017 at 4:57 PM, Navina Ramesh (Apache) 
wrote:

> Dong,
>
> Thanks for your prompt responses.
>
> >  And usually the underlying system allows user to select arbitrary
> partition
> number if it supports partition expansion. Do you know any system that
> does not
> meet these two requirement?
>
> I am not aware of a system that won't meet the modulo requirement. I was
> mostly questioning the requirement around *Stream Management* - which
> expects the expansion of partitions to always happen by doubling the
> partition count. That is different from saying "underlying system allows
> user to select arbitrary partition number if it support partition
> expansion". Please correct me if I have misunderstood what you meant in
> that requirement :)
>
> > Regarding your comment of the Motivation section
>
> Thanks for updating it.
>
> > , Kafka consumer can potentially fetch more data in one FetchResponse
> with more partitions in the FetchRequest. This is because we limit the
> maximum amount of data that can be fetch for a given partition in the
> FetchResponse.
>
> That makes sense. I didn't know that you had this reasoning in mind. Thanks
> for explaining.
>
> >To answer your question how partition expansion in Kafka impacts the
> clients, Kafka consumer is able to automatically detect new partition of
> the topic and reassign all (both old and new) partitions across consumers
> in the consumer group IF you tell consumer the topic to be subscribed. But
> consumer in Samza's container uses another way of subscription.
>
> Got it.
>
> Thanks!
> Navina
>
>
>
> On Wed, May 31, 2017 at 4:29 PM, Yi Pan  wrote:
>
> > Hi, Don,
> >
> > Thanks for the detailed design doc for a long-waited feature in Samza!
> > Really appreciate it! I did a quick pass and have the following comments:
> >
> > - minor: "limit the maximum size of partition" ==> "limit the maximum
> size
> > of each partition"
> > - "However, Samza currently is not able to handle partition expansion of
> > the input streams"==>better point out "for stateful jobs". For stateless
> > jobs, simply bouncing the job now can pick up the new partitions.
> > - "it is possible (e.g. with Kafka) that messages with a given key exists
> > in both partition 1 an 3. Because GroupByPartition will assign partition
> 1
> > and 3 to different tasks, messages with the same key may be handled by
> > different task/container/process and their state will be stored in
> > different changelog partition." The problem statement is not super clear
> > here. The issues with stateful jobs is: after GroupByPartition assign
> > partition 1 and 3 to different tasks, the new task handling partition 3
> > does not have the previous state to resume the work. e.g. a page-key
> based
> > counter would start from 0 in the new task for a specific key, instead of
> > resuming the previous count 50 held by task 1.
> > - minor rewording: "the first solution in this doc" ==> "the solution
> > proposed in this doc"
> > - "Thus additional development work is needed in Kafka to meet this
> > requirement" It would be good to link to a KIP if and when it exists
> > - Instead of touching/deprecating the interface
> > SystemStreamPartitionGrouper, I would recommend to have a different
> > implementation class of the interface, which in the constructor of the
> > grouper, takes two parameters: a) the previous task number read from the
> > coordinator stream; b) the configured new-partition to old-partition
> > mapping policy. Then, the grouper's interface method stays the same and
> the
> > behavior of the grouper is more configurable which is good to support a
> > broader set of use cases in addition to Kafka's built-in partition
> > expansion policies.
> > - Minor renaming suggestion to the new grouper class names:
> > GroupByPartitionWithFixedTaskNum
> > and GroupBySystemStreamPartitionWithFixedTaskNum
> >
> > Thanks!
> >
> > - Yi
> >
> > On Wed, May 31, 2017 at 10:33 AM, Dong Lin  wrote:
> >
> > > Hey Navina,
> > >
> > > Thanks much for the comment. Please see my response below.
> > >
> > > Regarding your biggest gripe with the SEP, I personally think the
> > > operational requirement proposed in the KIP are pretty general and
> could
> > be
> > > easily enforced by other systems. The reason is that the module
> operation
> > > is pretty standard and the default option when we choose partition. And
> > > usually the underlying system allows user to select arbitrary partition
> > > number if it supports partition expansion. Do you know any system that
> > does
> > > not meet these two requirement?
> > >
> > > Regarding your commen

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-05-31 Thread Navina Ramesh (Apache)
Dong,

Thanks for your prompt responses.

>  And usually the underlying system allows user to select arbitrary
partition
number if it supports partition expansion. Do you know any system that does not
meet these two requirement?

I am not aware of a system that won't meet the modulo requirement. I was
mostly questioning the requirement around *Stream Management* - which
expects the expansion of partitions to always happen by doubling the
partition count. That is different from saying "underlying system allows
user to select arbitrary partition number if it support partition
expansion". Please correct me if I have misunderstood what you meant in
that requirement :)

> Regarding your comment of the Motivation section

Thanks for updating it.

> , Kafka consumer can potentially fetch more data in one FetchResponse
with more partitions in the FetchRequest. This is because we limit the
maximum amount of data that can be fetch for a given partition in the
FetchResponse.

That makes sense. I didn't know that you had this reasoning in mind. Thanks
for explaining.

>To answer your question how partition expansion in Kafka impacts the
clients, Kafka consumer is able to automatically detect new partition of
the topic and reassign all (both old and new) partitions across consumers
in the consumer group IF you tell consumer the topic to be subscribed. But
consumer in Samza's container uses another way of subscription.

Got it.

Thanks!
Navina



On Wed, May 31, 2017 at 4:29 PM, Yi Pan  wrote:

> Hi, Don,
>
> Thanks for the detailed design doc for a long-waited feature in Samza!
> Really appreciate it! I did a quick pass and have the following comments:
>
> - minor: "limit the maximum size of partition" ==> "limit the maximum size
> of each partition"
> - "However, Samza currently is not able to handle partition expansion of
> the input streams"==>better point out "for stateful jobs". For stateless
> jobs, simply bouncing the job now can pick up the new partitions.
> - "it is possible (e.g. with Kafka) that messages with a given key exists
> in both partition 1 an 3. Because GroupByPartition will assign partition 1
> and 3 to different tasks, messages with the same key may be handled by
> different task/container/process and their state will be stored in
> different changelog partition." The problem statement is not super clear
> here. The issues with stateful jobs is: after GroupByPartition assign
> partition 1 and 3 to different tasks, the new task handling partition 3
> does not have the previous state to resume the work. e.g. a page-key based
> counter would start from 0 in the new task for a specific key, instead of
> resuming the previous count 50 held by task 1.
> - minor rewording: "the first solution in this doc" ==> "the solution
> proposed in this doc"
> - "Thus additional development work is needed in Kafka to meet this
> requirement" It would be good to link to a KIP if and when it exists
> - Instead of touching/deprecating the interface
> SystemStreamPartitionGrouper, I would recommend to have a different
> implementation class of the interface, which in the constructor of the
> grouper, takes two parameters: a) the previous task number read from the
> coordinator stream; b) the configured new-partition to old-partition
> mapping policy. Then, the grouper's interface method stays the same and the
> behavior of the grouper is more configurable which is good to support a
> broader set of use cases in addition to Kafka's built-in partition
> expansion policies.
> - Minor renaming suggestion to the new grouper class names:
> GroupByPartitionWithFixedTaskNum
> and GroupBySystemStreamPartitionWithFixedTaskNum
>
> Thanks!
>
> - Yi
>
> On Wed, May 31, 2017 at 10:33 AM, Dong Lin  wrote:
>
> > Hey Navina,
> >
> > Thanks much for the comment. Please see my response below.
> >
> > Regarding your biggest gripe with the SEP, I personally think the
> > operational requirement proposed in the KIP are pretty general and could
> be
> > easily enforced by other systems. The reason is that the module operation
> > is pretty standard and the default option when we choose partition. And
> > usually the underlying system allows user to select arbitrary partition
> > number if it supports partition expansion. Do you know any system that
> does
> > not meet these two requirement?
> >
> > Regarding your comment of the Motivation section, I renamed the first
> > section as "Problem and Goal" and specified that "*The goal of this
> > proposal is to enable partition expansion of the input streams*.". I also
> > put a sentence at the end of the Motivation section that "*The feature of
> > task expansion is out of the scope of this proposal and will be addressed
> > in a future SEP*". The second paragraph in the Motivation section is
> mainly
> > used to explain the thinking process that we have gone through, what
> other
> > alternative we have considered, and we plan to do in Samza in the nex
> step.
> >
> > To answer your question wh

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-05-31 Thread Yi Pan
Hi, Don,

Thanks for the detailed design doc for a long-waited feature in Samza!
Really appreciate it! I did a quick pass and have the following comments:

- minor: "limit the maximum size of partition" ==> "limit the maximum size
of each partition"
- "However, Samza currently is not able to handle partition expansion of
the input streams"==>better point out "for stateful jobs". For stateless
jobs, simply bouncing the job now can pick up the new partitions.
- "it is possible (e.g. with Kafka) that messages with a given key exists
in both partition 1 an 3. Because GroupByPartition will assign partition 1
and 3 to different tasks, messages with the same key may be handled by
different task/container/process and their state will be stored in
different changelog partition." The problem statement is not super clear
here. The issues with stateful jobs is: after GroupByPartition assign
partition 1 and 3 to different tasks, the new task handling partition 3
does not have the previous state to resume the work. e.g. a page-key based
counter would start from 0 in the new task for a specific key, instead of
resuming the previous count 50 held by task 1.
- minor rewording: "the first solution in this doc" ==> "the solution
proposed in this doc"
- "Thus additional development work is needed in Kafka to meet this
requirement" It would be good to link to a KIP if and when it exists
- Instead of touching/deprecating the interface
SystemStreamPartitionGrouper, I would recommend to have a different
implementation class of the interface, which in the constructor of the
grouper, takes two parameters: a) the previous task number read from the
coordinator stream; b) the configured new-partition to old-partition
mapping policy. Then, the grouper's interface method stays the same and the
behavior of the grouper is more configurable which is good to support a
broader set of use cases in addition to Kafka's built-in partition
expansion policies.
- Minor renaming suggestion to the new grouper class names:
GroupByPartitionWithFixedTaskNum
and GroupBySystemStreamPartitionWithFixedTaskNum

Thanks!

- Yi

On Wed, May 31, 2017 at 10:33 AM, Dong Lin  wrote:

> Hey Navina,
>
> Thanks much for the comment. Please see my response below.
>
> Regarding your biggest gripe with the SEP, I personally think the
> operational requirement proposed in the KIP are pretty general and could be
> easily enforced by other systems. The reason is that the module operation
> is pretty standard and the default option when we choose partition. And
> usually the underlying system allows user to select arbitrary partition
> number if it supports partition expansion. Do you know any system that does
> not meet these two requirement?
>
> Regarding your comment of the Motivation section, I renamed the first
> section as "Problem and Goal" and specified that "*The goal of this
> proposal is to enable partition expansion of the input streams*.". I also
> put a sentence at the end of the Motivation section that "*The feature of
> task expansion is out of the scope of this proposal and will be addressed
> in a future SEP*". The second paragraph in the Motivation section is mainly
> used to explain the thinking process that we have gone through, what other
> alternative we have considered, and we plan to do in Samza in the nex step.
>
> To answer your question why increasing the partition number will increase
> the throughput of the kafka consumer in the container, Kafka consumer can
> potentially fetch more data in one FetchResponse with more partitions in
> the FetchRequest. This is because we limit the maximum amount of data that
> can be fetch for a given partition in the FetchResponse. This by default is
> set to 1 MB. And there is reason that we can not arbitrarily bump up this
> limit.
>
> To answer your question how partition expansion in Kafka impacts the
> clients, Kafka consumer is able to automatically detect new partition of
> the topic and reassign all (both old and new) partitions across consumers
> in the consumer group IF you tell consumer the topic to be subscribed. But
> consumer in Samza's container uses another way of subscription. Instead of
> subscribing to the topic, the consumer in Samza's container subscribes to
> the specific partitions of the topic. In this case, if new partitions have
> been added, Samza will need to explicitly subscribe to the new partitions
> of the topic. The "Handle partition expansion while tasks are running"
> section in the SEP addresses this issue in Samza -- it recalculates the job
> model and restart container so that consumer can subscribe to the new
> partitions.
>
> I will ask other dev to take a look at the proposal. I will start the
> voting thread tomorrow if there is no further concern with the SEP.
>
> Thanks!
> Dong
>
>
> On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh (Apache) <
> nav...@apache.org>
> wrote:
>
> > Hey Dong,
> >
> > >  I have updated the motivation section to clarify this.
> >
> > Thanks fo

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-05-31 Thread Dong Lin
Hey Navina,

Thanks much for the comment. Please see my response below.

Regarding your biggest gripe with the SEP, I personally think the
operational requirement proposed in the KIP are pretty general and could be
easily enforced by other systems. The reason is that the module operation
is pretty standard and the default option when we choose partition. And
usually the underlying system allows user to select arbitrary partition
number if it supports partition expansion. Do you know any system that does
not meet these two requirement?

Regarding your comment of the Motivation section, I renamed the first
section as "Problem and Goal" and specified that "*The goal of this
proposal is to enable partition expansion of the input streams*.". I also
put a sentence at the end of the Motivation section that "*The feature of
task expansion is out of the scope of this proposal and will be addressed
in a future SEP*". The second paragraph in the Motivation section is mainly
used to explain the thinking process that we have gone through, what other
alternative we have considered, and we plan to do in Samza in the nex step.

To answer your question why increasing the partition number will increase
the throughput of the kafka consumer in the container, Kafka consumer can
potentially fetch more data in one FetchResponse with more partitions in
the FetchRequest. This is because we limit the maximum amount of data that
can be fetch for a given partition in the FetchResponse. This by default is
set to 1 MB. And there is reason that we can not arbitrarily bump up this
limit.

To answer your question how partition expansion in Kafka impacts the
clients, Kafka consumer is able to automatically detect new partition of
the topic and reassign all (both old and new) partitions across consumers
in the consumer group IF you tell consumer the topic to be subscribed. But
consumer in Samza's container uses another way of subscription. Instead of
subscribing to the topic, the consumer in Samza's container subscribes to
the specific partitions of the topic. In this case, if new partitions have
been added, Samza will need to explicitly subscribe to the new partitions
of the topic. The "Handle partition expansion while tasks are running"
section in the SEP addresses this issue in Samza -- it recalculates the job
model and restart container so that consumer can subscribe to the new
partitions.

I will ask other dev to take a look at the proposal. I will start the
voting thread tomorrow if there is no further concern with the SEP.

Thanks!
Dong


On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh (Apache) 
wrote:

> Hey Dong,
>
> >  I have updated the motivation section to clarify this.
>
> Thanks for updating the motivation. Couple of notes here:
>
> 1.
> > "The motivation of increasing partition number of Kafka topic includes 1)
> limit the maximum size of a partition in order to improve broker
> performance and 2) increase throughput of Kafka consumer in the Samza
> container."
>
> It's unclear to me how increasing the partition number will increase the
> throughput of the kafka consumer in the container? Theoretically, you will
> still be consuming the same amount of data in the container, irrespective
> of whether it is coming from one partition or more than one expanded
> partitions. Can you please explain it for me here, what you mean by that?
>
> 2. I believe the second paragraph under motivation is simply talking about
> the scope of the current SEP. It will be easier to read if what solution is
> included in this SEP and what is left out as not in scope. (for example,
> expansions for stateful jobs is supported or not).
>
> > We need to persist the task-to-sspList mapping in the
> coordinator stream so that the job can derive the original number of
> partitions of each input stream regardless of how many times the partition
> has expanded. Does this make sense?
>
> Yes. It does!
>
> > I am not sure how this is related to the locality though. Can you clarify
> your question if I haven't answered your question?
>
> It's not related. I just meant to give an example of yet another
> coordinator message that is persisted. Your ssp-to-task mapping is
> following a similar pattern for persisting. Just wanted to clarify that.
>
> > Can you let me know if this, together with the answers in the previous
> email, addresses all your questions?
>
> Yes. I believe you have addressed most of my questions. Thanks for taking
> time to do that.
>
> > Is there specific question you have regarding partition
> expansion in Kafka?
>
> I guess my questions are on how partition expansion in Kafka impacts the
> clients. Iiuc, partition expansions are done manually in Kafka based on the
> bytes-in rate of the partition. Do the existing kafka clients handle this
> expansion automatically? if yes, how does it work? If not, are there plans
> to support it in the future?
>
> > Thus user's job should not need to bootstrap key/value store from the
> changelog topic.

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-05-31 Thread Navina Ramesh (Apache)
Hey Dong,

>  I have updated the motivation section to clarify this.

Thanks for updating the motivation. Couple of notes here:

1.
> "The motivation of increasing partition number of Kafka topic includes 1)
limit the maximum size of a partition in order to improve broker
performance and 2) increase throughput of Kafka consumer in the Samza
container."

It's unclear to me how increasing the partition number will increase the
throughput of the kafka consumer in the container? Theoretically, you will
still be consuming the same amount of data in the container, irrespective
of whether it is coming from one partition or more than one expanded
partitions. Can you please explain it for me here, what you mean by that?

2. I believe the second paragraph under motivation is simply talking about
the scope of the current SEP. It will be easier to read if what solution is
included in this SEP and what is left out as not in scope. (for example,
expansions for stateful jobs is supported or not).

> We need to persist the task-to-sspList mapping in the
coordinator stream so that the job can derive the original number of
partitions of each input stream regardless of how many times the partition
has expanded. Does this make sense?

Yes. It does!

> I am not sure how this is related to the locality though. Can you clarify
your question if I haven't answered your question?

It's not related. I just meant to give an example of yet another
coordinator message that is persisted. Your ssp-to-task mapping is
following a similar pattern for persisting. Just wanted to clarify that.

> Can you let me know if this, together with the answers in the previous
email, addresses all your questions?

Yes. I believe you have addressed most of my questions. Thanks for taking
time to do that.

> Is there specific question you have regarding partition
expansion in Kafka?

I guess my questions are on how partition expansion in Kafka impacts the
clients. Iiuc, partition expansions are done manually in Kafka based on the
bytes-in rate of the partition. Do the existing kafka clients handle this
expansion automatically? if yes, how does it work? If not, are there plans
to support it in the future?

> Thus user's job should not need to bootstrap key/value store from the
changelog topic.

Why is this discussion relevant here? Key/value store / changelog topic
partition is scoped with the context of a task. Since we are not changing
the number of tasks, I don't think it is required to mention it here.

> The new method takes the SystemStreamPartition-to-Task assignment from
the previous job model which can be read from the coordinator stream.

Jobmodel is currently not persisted to coordinator stream. In your design,
you talk about writing separate coordinator messages for ssp-to-task
assignments. Hence, please correct this statement. It is kind of misleading
to the reader.

My biggest gripe with this SEP is that it seems like a tailor-made solution
that relies on the semantics of the Kafka system and yet, we are trying to
masquerade that as operational requirements for other systems interacting
with Samza. (Not to say that this is the first time such a choice is being
made in the Samza design). I am not seeing how this can a "general"
solution for all input systems. That's my two cents. I would like to hear
alternative points of view for this from other devs.

Please make sure you have enough eyes on this SEP. If you do, please start
a VOTE thread to approve this SEP.

Thanks!
Navina


On Mon, May 29, 2017 at 12:32 AM, Dong Lin  wrote:

> Hey Navina,
>
> I have updated the wiki based on your suggestion. More specifically, I have
> made the following changes:
>
> - Improved Problem section and Motivation section to describe why we use
> the solution in this proposal instead of tackling the problem of task
> expansion directly.
>
> - Illustrate the design in a way that doesn't bind to Kafka. Kafka is only
> used as example to illustrate why we want to expand partition expansion and
> whether the operational requirement can be supported when Kafka is used as
> the input system. Note that the proposed solution should work for any input
> system that meets the operational requirement described in the wiki.
>
> - Fixed the problem in the figure.
>
> - Added a new class GroupBySystemStreamPartitionFixedTaskNum to the wiki.
> Together with GroupByPartitionFixedTaskNum, it should ensure that we have a
> solution to enable partition expansion for all users that are using
> pre-defined grouper in Samza. Note that those users who use custom grouper
> would need to update their implementation.
>
> Can you let me know if this, together with the answers in the previous
> email, addresses all your questions? Thanks for taking time to review the
> proposal.
>
> Regards,
> Dong
>
>
>
>
>
>
>
> On Wed, May 24, 2017 at 11:15 PM, Dong Lin  wrote:
>
> > Hey Navina,
> >
> > Thanks much for your comments. Please see my reply inline.
> >
> > On Wed, May 24, 2017 at 10:22 AM, 

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-05-29 Thread Dong Lin
Hey Navina,

I have updated the wiki based on your suggestion. More specifically, I have
made the following changes:

- Improved Problem section and Motivation section to describe why we use
the solution in this proposal instead of tackling the problem of task
expansion directly.

- Illustrate the design in a way that doesn't bind to Kafka. Kafka is only
used as example to illustrate why we want to expand partition expansion and
whether the operational requirement can be supported when Kafka is used as
the input system. Note that the proposed solution should work for any input
system that meets the operational requirement described in the wiki.

- Fixed the problem in the figure.

- Added a new class GroupBySystemStreamPartitionFixedTaskNum to the wiki.
Together with GroupByPartitionFixedTaskNum, it should ensure that we have a
solution to enable partition expansion for all users that are using
pre-defined grouper in Samza. Note that those users who use custom grouper
would need to update their implementation.

Can you let me know if this, together with the answers in the previous
email, addresses all your questions? Thanks for taking time to review the
proposal.

Regards,
Dong







On Wed, May 24, 2017 at 11:15 PM, Dong Lin  wrote:

> Hey Navina,
>
> Thanks much for your comments. Please see my reply inline.
>
> On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh (Apache) <
> nav...@apache.org> wrote:
>
>> Thanks for the SEP, Dong. I have a couple of questions to understand your
>> proposal better:
>>
>> * Under motivation, you mention that "_We expect this solution to work
>> similarly with other input system as well._", yet I don't see any
>> discussion on how it will work with other input systems. That is, what
>> kind
>> of contract does samza expect from other input systems ? If we are not
>> planning to provide a generic solution, it might be worth calling it out
>> in
>> the SEP.
>>
>
> I think the contract we expect from other systems are exactly the
> operational requirement mentioned in the SEP, i.e. partitions should always
> be doubled and the hash algorithm should module the number of partitions.
> SEP-5 should also allow partition expansion of all input systems that meet
> these two requirements. I have updated the motivation section to clarify
> this.
>
>
>>
>> * I understand the partition mapping logic you have proposed. But I think
>> the example explanation doesn't match the diagram. In the diagram, after
>> expansion, partiion-0 and partition-1 are pointing to bucket 0 and
>> partition-3 and partition-4 are pointing to bucket 1. I think the former
>> has to be partition-0 and partition-2 and the latter, is partition-1 and
>> partition-3. If I am wrong, please help me understand the logic :)
>>
>
> Good catch. I will update the figure to fix this problem.
>
>
>>
>> * I don't know how partition expansion in Kafka works. I am familiar with
>> how shard splitting happens in Kinesis - there is hierarchical relation
>> between the parent and child shards. This way, it will also allow the
>> shards to be merged back. Iiuc, Kafka only supports partition "expansion",
>> as opposed to "splits". Can you provide some context or link related to
>> how
>> partition expansion works in Kafka?
>>
>
> I couldn't find any wiki on partition expansion in Kafka. The partition
> expansion logic in Kafka is very simply -- it simply adds new partition to
> the existing topic. Is there specific question you have regarding partition
> expansion in Kafka?
>
>
>>
>> * Are you only recommending that expansion can be supported for samza jobs
>> that use Kafka as input systems **and** configure the SSPGrouper as
>> GroupByPartitionFixedTaskNum? Sounds to me like this only applies for
>> GroupByPartition. Please correct me if I am wrong. What is the expectation
>> for custom SSP Groupers?
>>
>
> The expansion can be supported for Samza jobs if the input system meets
> the operational requirement mentioned above. It doesn't have to use Kafka
> as input system.
>
> The current proposal provided solution for jobs that currently use
> GroupByPartition. The proposal can be extended to support jobs that use
> other grouper that are pre-defined in Samza. The custom SSP grouper needs
> to handle partition expansion similar to how GroupByPartitionFixedTaskNum
> handles it and it is users' responsibility to update their custom grouper
> implementation.
>
>
>>
>> * Regarding storing SSP-to-Task assignment to coordinator stream: Today,
>> the JobModel encapsulates the data model in samza which also includes
>> **TaskModels**. TaskModel, typically shows the task-to-sspList mapping.
>> What is the reason for using a separate coordinator stream message
>> *SetSSPTaskMapping*? Is it because the JobModel itself is not persisted in
>> the coordinator stream today?  The reason locality exists outside of the
>> jobmodel is because *locality* information is written by each container,
>> where as it is consumed only by the leader jobcoordinat

Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-05-24 Thread Dong Lin
Hey Navina,

Thanks much for your comments. Please see my reply inline.

On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh (Apache) 
wrote:

> Thanks for the SEP, Dong. I have a couple of questions to understand your
> proposal better:
>
> * Under motivation, you mention that "_We expect this solution to work
> similarly with other input system as well._", yet I don't see any
> discussion on how it will work with other input systems. That is, what kind
> of contract does samza expect from other input systems ? If we are not
> planning to provide a generic solution, it might be worth calling it out in
> the SEP.
>

I think the contract we expect from other systems are exactly the
operational requirement mentioned in the SEP, i.e. partitions should always
be doubled and the hash algorithm should module the number of partitions.
SEP-5 should also allow partition expansion of all input systems that meet
these two requirements. I have updated the motivation section to clarify
this.


>
> * I understand the partition mapping logic you have proposed. But I think
> the example explanation doesn't match the diagram. In the diagram, after
> expansion, partiion-0 and partition-1 are pointing to bucket 0 and
> partition-3 and partition-4 are pointing to bucket 1. I think the former
> has to be partition-0 and partition-2 and the latter, is partition-1 and
> partition-3. If I am wrong, please help me understand the logic :)
>

Good catch. I will update the figure to fix this problem.


>
> * I don't know how partition expansion in Kafka works. I am familiar with
> how shard splitting happens in Kinesis - there is hierarchical relation
> between the parent and child shards. This way, it will also allow the
> shards to be merged back. Iiuc, Kafka only supports partition "expansion",
> as opposed to "splits". Can you provide some context or link related to how
> partition expansion works in Kafka?
>

I couldn't find any wiki on partition expansion in Kafka. The partition
expansion logic in Kafka is very simply -- it simply adds new partition to
the existing topic. Is there specific question you have regarding partition
expansion in Kafka?


>
> * Are you only recommending that expansion can be supported for samza jobs
> that use Kafka as input systems **and** configure the SSPGrouper as
> GroupByPartitionFixedTaskNum? Sounds to me like this only applies for
> GroupByPartition. Please correct me if I am wrong. What is the expectation
> for custom SSP Groupers?
>

The expansion can be supported for Samza jobs if the input system meets the
operational requirement mentioned above. It doesn't have to use Kafka as
input system.

The current proposal provided solution for jobs that currently use
GroupByPartition. The proposal can be extended to support jobs that use
other grouper that are pre-defined in Samza. The custom SSP grouper needs
to handle partition expansion similar to how GroupByPartitionFixedTaskNum
handles it and it is users' responsibility to update their custom grouper
implementation.


>
> * Regarding storing SSP-to-Task assignment to coordinator stream: Today,
> the JobModel encapsulates the data model in samza which also includes
> **TaskModels**. TaskModel, typically shows the task-to-sspList mapping.
> What is the reason for using a separate coordinator stream message
> *SetSSPTaskMapping*? Is it because the JobModel itself is not persisted in
> the coordinator stream today?  The reason locality exists outside of the
> jobmodel is because *locality* information is written by each container,
> where as it is consumed only by the leader jobcoordinator/AM. In this case,
> the writer of the mapping information and the reader is still the leader
> jobcoordinator/AM. So, I want to understand the motivation for this choice.
>

Yes, the reason for using a separate coordinate stream message is because
the task-to-sspList mapping is not currently persisted in the coordinator
stream. We wouldn't need to create this new stream message if JobModel is
persisted. We need to persist the task-to-sspList mapping in the
coordinator stream so that the job can derive the original number of
partitions of each input stream regardless of how many times the partition
has expanded. Does this make sense?

I am not sure how this is related to the locality though. Can you clarify
your question if I haven't answered your question?

Thanks!
Dong


>
> Cheers!
> Navina
>
> On Tue, May 23, 2017 at 5:45 PM, Dong Lin  wrote:
>
> > Hi all,
> >
> > We created SEP-5: Enable partition expansion of input streams. Please
> find
> > the SEP wiki in the link
> > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > 5%3A+Enable+partition+expansion+of+input+streams
> > .
> >
> > You feedback is appreciated!
> >
> > Thanks,
> > Dong
> >
>


Re: [DISCUSS] SEP-5: Enable partition expansion of input streams

2017-05-24 Thread Navina Ramesh (Apache)
Thanks for the SEP, Dong. I have a couple of questions to understand your
proposal better:

* Under motivation, you mention that "_We expect this solution to work
similarly with other input system as well._", yet I don't see any
discussion on how it will work with other input systems. That is, what kind
of contract does samza expect from other input systems ? If we are not
planning to provide a generic solution, it might be worth calling it out in
the SEP.

* I understand the partition mapping logic you have proposed. But I think
the example explanation doesn't match the diagram. In the diagram, after
expansion, partiion-0 and partition-1 are pointing to bucket 0 and
partition-3 and partition-4 are pointing to bucket 1. I think the former
has to be partition-0 and partition-2 and the latter, is partition-1 and
partition-3. If I am wrong, please help me understand the logic :)

* I don't know how partition expansion in Kafka works. I am familiar with
how shard splitting happens in Kinesis - there is hierarchical relation
between the parent and child shards. This way, it will also allow the
shards to be merged back. Iiuc, Kafka only supports partition "expansion",
as opposed to "splits". Can you provide some context or link related to how
partition expansion works in Kafka?

* Are you only recommending that expansion can be supported for samza jobs
that use Kafka as input systems **and** configure the SSPGrouper as
GroupByPartitionFixedTaskNum? Sounds to me like this only applies for
GroupByPartition. Please correct me if I am wrong. What is the expectation
for custom SSP Groupers?

* Regarding storing SSP-to-Task assignment to coordinator stream: Today,
the JobModel encapsulates the data model in samza which also includes
**TaskModels**. TaskModel, typically shows the task-to-sspList mapping.
What is the reason for using a separate coordinator stream message
*SetSSPTaskMapping*? Is it because the JobModel itself is not persisted in
the coordinator stream today?  The reason locality exists outside of the
jobmodel is because *locality* information is written by each container,
where as it is consumed only by the leader jobcoordinator/AM. In this case,
the writer of the mapping information and the reader is still the leader
jobcoordinator/AM. So, I want to understand the motivation for this choice.

Cheers!
Navina

On Tue, May 23, 2017 at 5:45 PM, Dong Lin  wrote:

> Hi all,
>
> We created SEP-5: Enable partition expansion of input streams. Please find
> the SEP wiki in the link
> https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> 5%3A+Enable+partition+expansion+of+input+streams
> .
>
> You feedback is appreciated!
>
> Thanks,
> Dong
>


[DISCUSS] SEP-5: Enable partition expansion of input streams

2017-05-23 Thread Dong Lin
Hi all,

We created SEP-5: Enable partition expansion of input streams. Please find
the SEP wiki in the link
https://cwiki.apache.org/confluence/display/SAMZA/SEP-5%3A+Enable+partition+expansion+of+input+streams
.

You feedback is appreciated!

Thanks,
Dong