[
https://issues.apache.org/jira/browse/KAFKA-10844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17249971#comment-17249971
]
Matthias J. Sax edited comment on KAFKA-10844 at 12/15/20, 9:59 PM:
--------------------------------------------------------------------
For (1) (or KAFKA-4835) it's actually an easy thing to fix – the demand was
just not big and nobody picked it up. – Btw: I would not change `groupBy()` for
this case, but rather let people rewrite to
`selectKey(...).reinterpretAsKeyedStream().groupByKey()` – this keep the API
surface area smaller.
I guess one tricky question we need to keep in mind though is the us of IQ –
while the partitioning might be ok for the aggregation/join downstream, it
would still break IQ as the new key might compute a different hash and thus
would map to the wrong instance.
For (2), as we process data in timestamp order, the skew should not become too
big, and setting a larger grace-period and retention-time should help to
mitigate the issue. It's a more complex problem to address, but we have some
ideas and might tackle it in 2021...
was (Author: mjsax):
For (1) (or KAFKA-4835) it's actually an easy thing to fix – the demand was
just not big and nobody picked it up. – Btw: I would not change `groupBy()` for
this case, but rather let people rewrite to
`selectKey(...).reinterpretAsKeyedStream().groupByKey()` – this keep the API
surface area smaller.
For (2), as we process data in timestamp order, the skew should not become too
big, and setting a larger grace-period and retention-time should help to
mitigate the issue. It's a more complex problem to address, but we have some
ideas and might tackle it in 2021...
> groupBy without shuffling
> -------------------------
>
> Key: KAFKA-10844
> URL: https://issues.apache.org/jira/browse/KAFKA-10844
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 2.6.0
> Reporter: Mathieu DESPRIEE
> Priority: Major
> Labels: needs-kip
>
> The idea is to give a way to keep the current partitioning while doing a
> groupBy.
> Our use-case is the following:
> We process device data (stream is partitioned by device-id), each device
> produces several metrics. We want to aggregate by metric, so currently we do a
> {code:java}
> selectKey( ... => (device,
> metric)).groupByKey.windowedBy(...).aggregate(...) {code}
> This shuffles the data around, but it's not necessary, each (device, metric)
> group could stay in the original partition.
> This is not only an optimization question. We are experiencing invalid
> aggregations when reprocessing history. In these reprocessing, we frequently
> see some tasks moving faster on some partitions. This causes problems with
> event-time: Lets' say data for device d1 is in partition p1 and stream-time
> t1, and device d2 / partition p2 / time t2.
> Now, if I re-key by (device, metric), records from both devices could have
> the same hash-key and land in the same partition. And if t2 is far ahead of
> t1, then all time-windows for t1 get expired at once.
> Maybe I miss some way of doing this with the existing API, please let me
> know. Currently, I manually repartition and specify a custom partitioner, but
> it's tedious.
> If I were to rewrite the aggregations manually with Transformer API, I would
> use (device, key) for my state store key, without changing the record key.
>
> _(poke_ [~vvcephei] _following our discussion on users ml)_
--
This message was sent by Atlassian Jira
(v8.3.4#803005)