Hi Mathieu,

Ah, that is unfortunate. I believe your analysis is correct. In general, we 
have no good solution to the problem of upstream tasks moving ahead of each 
other and causing disorder in the repartition topics. Guozhang has done a 
substantial amount of thinking on this subject, though, and has some ideas for 
how we can improve the behavior. 

However, your situation is a special case, since the data actually doesn’t need 
to be shuffled. Ideally, there would be a way to say “I assert that the 
partitioning doesn’t need to change despite this key change”. That actually 
seems like a good feature for the repartition operator.

In the absence of that feature, I think you’re on the right track. If you 
specify a partitioner that produces exactly the same partitioning as the 
source, you _should_ be able to avoid any shuffling, although there will still 
be a repartition topic there. 

I hope this helps,
John

On Fri, Nov 20, 2020, at 04:14, Mathieu D wrote:
> Hello there,
> 
> We're processing IOT device data, each device sending several metrics.
> 
> When we upgrade our streams app, we set a brand new 'application.id' to
> reprocess a bit of past data, to warm up the state stores and aggregations
> to make sure all outputs will be valid. Downstream is designed for "at
> least once" so no problem with this bit of reprocessing.
> 
> When this restart+reprocessing occurs, we observe a peak of "Skipping
> record for expired window" / "Skipping record for expired segment" warning
> in logs.
> 
> My understanding so far is this:
> - a part of our topology is keyed by deviceId.
> - during the reprocessing, some tasks are moving faster for some
> partitions, which means there's a substantial difference between the
> various stream-times across tasks
> - at some point in the topology, we re-key the data by (deviceId, metric)
> for "group by metric" aggregations
> - this shuffles the data:  deviceId1 was in partition 1 with eventTime1,
> deviceId2 was in partition 2 with eventTime2, and now by the magic of
> hashing a (device,metric) key, they are pushed together in the same
> partitionX. If eventTime2 is far ahead of eventTime1, then all windows
> will  expire at once.
> 
> Is this analysis correct ?
> 
> Then, what's the proper way to avoid this ? Manually do a .repartition()
> with a custom partitioner after each .selectKey((device, metric)), and
> before going through aggregations ?
> 
> Any other advice ?
> 
> Thanks for your insights
> 
> Mathieu
>

Reply via email to