[ 
https://issues.apache.org/jira/browse/KAFKA-10844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17252759#comment-17252759
 ] 

Mathieu DESPRIEE edited comment on KAFKA-10844 at 12/21/20, 11:00 AM:
----------------------------------------------------------------------

[~mjsax] Thanks for the answer.
I do confirm very high skew when I reprocess history. I've already seen a 
spread of several *days* (in event-time) between tasks while re-processing 
history.
Changing the gracePeriods when reprocessing history could be a way, but only if 
the skew is less than a couple of hours. More than that, means I'd need to wait 
a long time after reprocessing history to get the final aggregations.

There are  probably things I could tune to reduce that spread but, in any case, 
it does not feel right to have windows expiring like that. Ultimately, 
aggregation correctness should not depend on config,  IMHO. 

Frankly, I don't understand how one can reprocess history correctly as soon as 
there is a shuffle in the topology.

I'm very interested in your 2021 plans about that. Is there any KIP or 
discussion I could read already ?



was (Author: mathieude):
[~mjsax] Thanks for the answer.
I do confirm very high skew when I reprocess history. I've already seen a 
spread of several *days* (in event-time) between tasks while re-processing 
history.
Changing the gracePeriods when reprocessing history could be a way, but only of 
the skew is less than a couple of hours. More than that, means I'd need to wait 
a long time after reprocessing history to get the final aggregations.

There are  probably things I could tune to reduce that spread but, in any case, 
it does not feel right to have windows expiring like that. Ultimately, 
aggregation correctness should not depend on config,  IMHO. 

Frankly, I don't understand how one can reprocess history correctly as soon as 
there is a shuffle in the topology.

I'm very interested in your 2021 plans about that. Is there any KIP or 
discussion I could read already ?


> groupBy without shuffling
> -------------------------
>
>                 Key: KAFKA-10844
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10844
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: Mathieu DESPRIEE
>            Priority: Major
>              Labels: needs-kip
>
> The idea is to give a way to keep the current partitioning while doing a 
> groupBy.
> Our use-case is the following:
>  We process device data (stream is partitioned by device-id), each device 
> produces several metrics. We want to aggregate by metric, so currently we do a
> {code:java}
>  selectKey( ... => (device, 
> metric)).groupByKey.windowedBy(...).aggregate(...)  {code}
> This shuffles the data around, but it's not necessary, each (device, metric) 
> group could stay in the original partition.
> This is not only an optimization question. We are experiencing invalid 
> aggregations when reprocessing history. In these reprocessing, we frequently 
> see some tasks moving faster on some partitions. This causes problems with 
> event-time: Lets' say data for device d1 is in partition p1 and stream-time 
> t1, and device d2 / partition p2 / time t2.
>  Now, if I re-key by (device, metric), records from both devices could have 
> the same hash-key and land in the same partition. And if t2 is far ahead of 
> t1, then all time-windows for t1 get expired at once.
> Maybe I miss some way of doing this with the existing API, please let me 
> know. Currently, I manually repartition and specify a custom partitioner, but 
> it's tedious.
> If I were to rewrite the aggregations manually with Transformer API, I would 
> use (device, key) for my state store key, without changing the record key.
>  
> _(poke_ [~vvcephei] _following our discussion on users ml)_



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to