[jira] [Commented] (KAFKA-4835) Allow users control over repartitioning

2017-03-13 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922911#comment-15922911
 ] 

Matthias J. Sax commented on KAFKA-4835:


Agreed. I just wanted to point out, that there a different scenarios. This JIRA 
make a lot of sense for "re-keying that preserves partitioning" because there a 
(conceptually) multiple primary keys in the data, and you just want to set a 
different primary key.

> Allow users control over repartitioning
> ---
>
> Key: KAFKA-4835
> URL: https://issues.apache.org/jira/browse/KAFKA-4835
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Michal Borowiecki
>  Labels: needs-kip
>
> From 
> https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030
> ...it would be good to provide users more control over the repartitioning. 
> My use case is as follows (unrelated bits omitted for brevity):
> {code}
>   KTable loggedInCustomers = builder
>   .stream("customerLogins")
>   .groupBy((key, activity) -> 
>   activity.getCustomerRef())
>   .reduce((first,second) -> second, loginStore());
>   
>   builder
>   .stream("balanceUpdates")
>   .map((key, activity) -> new KeyValue<>(
>   activity.getCustomerRef(),
>   activity))
>   .join(loggedInCustomers, (activity, session) -> ...
>   .to("sessions");
> {code}
> Both "groupBy" and "map" in the underlying implementation set the 
> repartitionRequired flag (since the key changes), and the aggregation/join 
> that follows will create the repartitioned topic.
> However, in our case I know that both input streams are already partitioned 
> by the customerRef value, which I'm mapping into the key (because it's 
> required by the join operation).
> So there are 2 unnecessary intermediate topics created with their associated 
> overhead, while the ultimate goal is simply to do a join on a value that we 
> already use to partition the original streams anyway.
> (Note, we don't have the option to re-implement the original input streams to 
> make customerRef the message key.)
> I think it would be better to allow the user to decide (from their knowledge 
> of the incoming streams) whether a repartition is mandatory on aggregation 
> and join operations (overloaded version of the methods with the 
> repartitionRequired flag exposed maybe?)
> An alternative would be to allow users to perform a join on a value other 
> than the key (a keyValueMapper parameter to join, like the one used for joins 
> with global tables), but I expect that to be more involved and error-prone to 
> use for people who don't understand the partitioning requirements well 
> (whereas it's safe for global tables).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4835) Allow users control over repartitioning

2017-03-13 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907523#comment-15907523
 ] 

Michal Borowiecki commented on KAFKA-4835:
--

My point above was that if the customerRef (which is what we partition by) was 
part of the key (not the whole key) then we'd still need to modify the key for 
the purpose of the join operation.
We'd need to do that for both streams, even though they would both be 
partitioned by the same part of the key, hence the re-partitioning (forced 
automatically by kafka streams) would be totally unnecessary.

In more generic terms, I think this can be a common use case. Let' consider it 
using DDD concepts. We have an aggregate comprised of multiple entities. We 
send messages for each entity (not the whole aggregate) but to ensure 
sequential processing for entities belonging to the same aggregate, the 
messages are partitioned by the aggregate id. The entity id is still important, 
especially for compacted topics it would be needed for deletion markers, as the 
key is all there is in that case. Hence it comes naturally to compose the 
message key as ::
Then, if you want to join two such streams by aggregate id, you should be able 
to do it without repartitioning (since both partitioned by the aggregate-id 
part of the msg key). However, since joins are only supported on the whole msg 
key, you're forced to re-map the key to just  prior to the join 
which in turn currently forces repartitioning.

> Allow users control over repartitioning
> ---
>
> Key: KAFKA-4835
> URL: https://issues.apache.org/jira/browse/KAFKA-4835
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Michal Borowiecki
>  Labels: needs-kip
>
> From 
> https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030
> ...it would be good to provide users more control over the repartitioning. 
> My use case is as follows (unrelated bits omitted for brevity):
> {code}
>   KTable loggedInCustomers = builder
>   .stream("customerLogins")
>   .groupBy((key, activity) -> 
>   activity.getCustomerRef())
>   .reduce((first,second) -> second, loginStore());
>   
>   builder
>   .stream("balanceUpdates")
>   .map((key, activity) -> new KeyValue<>(
>   activity.getCustomerRef(),
>   activity))
>   .join(loggedInCustomers, (activity, session) -> ...
>   .to("sessions");
> {code}
> Both "groupBy" and "map" in the underlying implementation set the 
> repartitionRequired flag (since the key changes), and the aggregation/join 
> that follows will create the repartitioned topic.
> However, in our case I know that both input streams are already partitioned 
> by the customerRef value, which I'm mapping into the key (because it's 
> required by the join operation).
> So there are 2 unnecessary intermediate topics created with their associated 
> overhead, while the ultimate goal is simply to do a join on a value that we 
> already use to partition the original streams anyway.
> (Note, we don't have the option to re-implement the original input streams to 
> make customerRef the message key.)
> I think it would be better to allow the user to decide (from their knowledge 
> of the incoming streams) whether a repartition is mandatory on aggregation 
> and join operations (overloaded version of the methods with the 
> repartitionRequired flag exposed maybe?)
> An alternative would be to allow users to perform a join on a value other 
> than the key (a keyValueMapper parameter to join, like the one used for joins 
> with global tables), but I expect that to be more involved and error-prone to 
> use for people who don't understand the partitioning requirements well 
> (whereas it's safe for global tables).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4835) Allow users control over repartitioning

2017-03-10 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905515#comment-15905515
 ] 

Matthias J. Sax commented on KAFKA-4835:


If you use a custom partitioner that is solely based on the message key, there 
is not problem like this. As long as you don't modify the key, no 
repartitioning will happen. And if you modify the key (which does require a 
repartitioning), you can give your custom partitioner to Streams, too. You only 
can't enjoy auto-repartitioning, but need to do all re-partitioning manually 
via {{through(StreamPartitionier partitoner, String topic)}}.

> Allow users control over repartitioning
> ---
>
> Key: KAFKA-4835
> URL: https://issues.apache.org/jira/browse/KAFKA-4835
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Michal Borowiecki
>  Labels: needs-kip
>
> From 
> https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030
> ...it would be good to provide users more control over the repartitioning. 
> My use case is as follows (unrelated bits omitted for brevity):
> {code}
>   KTable loggedInCustomers = builder
>   .stream("customerLogins")
>   .groupBy((key, activity) -> 
>   activity.getCustomerRef())
>   .reduce((first,second) -> second, loginStore());
>   
>   builder
>   .stream("balanceUpdates")
>   .map((key, activity) -> new KeyValue<>(
>   activity.getCustomerRef(),
>   activity))
>   .join(loggedInCustomers, (activity, session) -> ...
>   .to("sessions");
> {code}
> Both "groupBy" and "map" in the underlying implementation set the 
> repartitionRequired flag (since the key changes), and the aggregation/join 
> that follows will create the repartitioned topic.
> However, in our case I know that both input streams are already partitioned 
> by the customerRef value, which I'm mapping into the key (because it's 
> required by the join operation).
> So there are 2 unnecessary intermediate topics created with their associated 
> overhead, while the ultimate goal is simply to do a join on a value that we 
> already use to partition the original streams anyway.
> (Note, we don't have the option to re-implement the original input streams to 
> make customerRef the message key.)
> I think it would be better to allow the user to decide (from their knowledge 
> of the incoming streams) whether a repartition is mandatory on aggregation 
> and join operations (overloaded version of the methods with the 
> repartitionRequired flag exposed maybe?)
> An alternative would be to allow users to perform a join on a value other 
> than the key (a keyValueMapper parameter to join, like the one used for joins 
> with global tables), but I expect that to be more involved and error-prone to 
> use for people who don't understand the partitioning requirements well 
> (whereas it's safe for global tables).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4835) Allow users control over repartitioning

2017-03-10 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904785#comment-15904785
 ] 

Michal Borowiecki commented on KAFKA-4835:
--

Yes, that's the case. Message key is a concatenation of activity type + 
activity id but the partitioning is done on the customer.

NB. I don't think it was wise for us to not put the key partitioned on in the 
msg key, however, that ship has sailed, I'm afraid. 
However, my understanding is that even if the partitioning key was part of the 
msg key (e.g. activity type + customerRef + activity id), we'd still be using a 
custom partitioner and we'd still have this issue.

> Allow users control over repartitioning
> ---
>
> Key: KAFKA-4835
> URL: https://issues.apache.org/jira/browse/KAFKA-4835
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Michal Borowiecki
>  Labels: needs-kip
>
> From 
> https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030
> ...it would be good to provide users more control over the repartitioning. 
> My use case is as follows (unrelated bits omitted for brevity):
> {code}
>   KTable loggedInCustomers = builder
>   .stream("customerLogins")
>   .groupBy((key, activity) -> 
>   activity.getCustomerRef())
>   .reduce((first,second) -> second, loginStore());
>   
>   builder
>   .stream("balanceUpdates")
>   .map((key, activity) -> new KeyValue<>(
>   activity.getCustomerRef(),
>   activity))
>   .join(loggedInCustomers, (activity, session) -> ...
>   .to("sessions");
> {code}
> Both "groupBy" and "map" in the underlying implementation set the 
> repartitionRequired flag (since the key changes), and the aggregation/join 
> that follows will create the repartitioned topic.
> However, in our case I know that both input streams are already partitioned 
> by the customerRef value, which I'm mapping into the key (because it's 
> required by the join operation).
> So there are 2 unnecessary intermediate topics created with their associated 
> overhead, while the ultimate goal is simply to do a join on a value that we 
> already use to partition the original streams anyway.
> (Note, we don't have the option to re-implement the original input streams to 
> make customerRef the message key.)
> I think it would be better to allow the user to decide (from their knowledge 
> of the incoming streams) whether a repartition is mandatory on aggregation 
> and join operations (overloaded version of the methods with the 
> repartitionRequired flag exposed maybe?)
> An alternative would be to allow users to perform a join on a value other 
> than the key (a keyValueMapper parameter to join, like the one used for joins 
> with global tables), but I expect that to be more involved and error-prone to 
> use for people who don't understand the partitioning requirements well 
> (whereas it's safe for global tables).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4835) Allow users control over repartitioning

2017-03-10 Thread Michael Noll (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904709#comment-15904709
 ] 

Michael Noll commented on KAFKA-4835:
-

Just to clarify:

{quote}
However, in our case I know that both input streams are already partitioned by 
the customerRef value, which I'm mapping into the key (because it's required by 
the join operation).
[...]
(Note, we don't have the option to re-implement the original input streams to 
make customerRef the message key.)
{quote}

So this means a custom partitioner is being used upstream to route the original 
data records into partitions, and that custom partitioner makes its routing 
decisions based on the payload in the value, which contains the customerRef 
field?

> Allow users control over repartitioning
> ---
>
> Key: KAFKA-4835
> URL: https://issues.apache.org/jira/browse/KAFKA-4835
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Michal Borowiecki
>  Labels: needs-kip
>
> From 
> https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030
> ...it would be good to provide users more control over the repartitioning. 
> My use case is as follows (unrelated bits omitted for brevity):
> {code}
>   KTable loggedInCustomers = builder
>   .stream("customerLogins")
>   .groupBy((key, activity) -> 
>   activity.getCustomerRef())
>   .reduce((first,second) -> second, loginStore());
>   
>   builder
>   .stream("balanceUpdates")
>   .map((key, activity) -> new KeyValue<>(
>   activity.getCustomerRef(),
>   activity))
>   .join(loggedInCustomers, (activity, session) -> ...
>   .to("sessions");
> {code}
> Both "groupBy" and "map" in the underlying implementation set the 
> repartitionRequired flag (since the key changes), and the aggregation/join 
> that follows will create the repartitioned topic.
> However, in our case I know that both input streams are already partitioned 
> by the customerRef value, which I'm mapping into the key (because it's 
> required by the join operation).
> So there are 2 unnecessary intermediate topics created with their associated 
> overhead, while the ultimate goal is simply to do a join on a value that we 
> already use to partition the original streams anyway.
> (Note, we don't have the option to re-implement the original input streams to 
> make customerRef the message key.)
> I think it would be better to allow the user to decide (from their knowledge 
> of the incoming streams) whether a repartition is mandatory on aggregation 
> and join operations (overloaded version of the methods with the 
> repartitionRequired flag exposed maybe?)
> An alternative would be to allow users to perform a join on a value other 
> than the key (a keyValueMapper parameter to join, like the one used for joins 
> with global tables), but I expect that to be more involved and error-prone to 
> use for people who don't understand the partitioning requirements well 
> (whereas it's safe for global tables).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4835) Allow users control over repartitioning

2017-03-03 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15894817#comment-15894817
 ] 

Matthias J. Sax commented on KAFKA-4835:


I guess, we need API change for this, and thus, a KIP is required.

> Allow users control over repartitioning
> ---
>
> Key: KAFKA-4835
> URL: https://issues.apache.org/jira/browse/KAFKA-4835
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Michal Borowiecki
>  Labels: needs-kip
>
> From 
> https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030
> ...it would be good to provide users more control over the repartitioning. 
> My use case is as follows (unrelated bits omitted for brevity):
> {code}
>   KTable loggedInCustomers = builder
>   .stream("customerLogins")
>   .groupBy((key, activity) -> 
>   activity.getCustomerRef())
>   .reduce((first,second) -> second, loginStore());
>   
>   builder
>   .stream("balanceUpdates")
>   .map((key, activity) -> new KeyValue<>(
>   activity.getCustomerRef(),
>   activity))
>   .join(loggedInCustomers, (activity, session) -> ...
>   .to("sessions");
> {code}
> Both "groupBy" and "map" in the underlying implementation set the 
> repartitionRequired flag (since the key changes), and the aggregation/join 
> that follows will create the repartitioned topic.
> However, in our case I know that both input streams are already partitioned 
> by the customerRef value, which I'm mapping into the key (because it's 
> required by the join operation).
> So there are 2 unnecessary intermediate topics created with their associated 
> overhead, while the ultimate goal is simply to do a join on a value that we 
> already use to partition the original streams anyway.
> (Note, we don't have the option to re-implement the original input streams to 
> make customerRef the message key.)
> I think it would be better to allow the user to decide (from their knowledge 
> of the incoming streams) whether a repartition is mandatory on aggregation 
> and join operations (overloaded version of the methods with the 
> repartitionRequired flag exposed maybe?)
> An alternative would be to allow users to perform a join on a value other 
> than the key (a keyValueMapper parameter to join, like the one used for joins 
> with global tables), but I expect that to be more involved and error-prone to 
> use for people who don't understand the partitioning requirements well 
> (whereas it's safe for global tables).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)