[ 
https://issues.apache.org/jira/browse/KAFKA-7224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17097684#comment-17097684
 ] 

John Roesler commented on KAFKA-7224:
-------------------------------------

Hi all,

Thanks for the good points all around.

Just to close the loop on _this_ ticket (disk-based suppression). It was 
_extremely_ poor performance. So much so that my thinking was that for anyone 
with high enough volume to actually need suppression, it would be too slow to 
be useful. The problem is that we need to check the beginning of the 
suppression buffer on every (or almost every) record, to see if we need to emit 
something. For an in-memory store, this is fine, but for RocksDB in particular, 
the scan performance is very slow. There are fundamental reasons why this is 
the case, which we don't need to get into here.

It might be possible to cleverly engineer our way around the problem, but 
anything I came up with just sounded too complicated to be worth it.

However, this is only necessary if you want the semantics of Suppress (each 
record times out individually, based on stream time). If you instead just want 
to buffer everything on disk and then emit everything you've buffered, say once 
an hour, you can do it much more efficiently in a custom FlatTransformValues 
where you put all incoming data into the store, then schedule a wall-clock 
punctuation to scan the entire store and forward everything.

The one complication is that the wall-clock punctuation currently blocks the 
StreamThread, so you need to have some sense of how long it will take (observed 
empirically) and make sure that you set the {{max.poll.interval.ms}} with 
enough head-room so you won't drop out of the group.

This is bleeding more into the domain of KIP-424, which does seem more like 
what [~maatdeamon] needs (just agreeing with the discussion so far). I don't 
think there was any technical impediment to implementing that one, it was just 
that the KIP discussion petered out (which happens sometimes). I guess, 
building on my last paragraphs, _if_ we had wall-clock-based suppression, 
_then_ it might make more sense to offer on-disk suppression in addition to 
in-memory, as at least the (wall-clock + on-disk) configuration could be 
performant. But it would need much more design. I'm still unsure if on-disk 
suppression is really a good idea to implement in the DSL.

A final thought worth mentioning in this discussion is that KIP-557 ( 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams]
 ) will go a long way toward dropping unnecessary updates. This isn't the same 
thing as suppressing intermediate results, but it will help a great deal to at 
least drop idempotent updates early in the topology and not even have to 
suppress them at the end.

Thanks,

-John

> KIP-328: Add spill-to-disk for Suppression
> ------------------------------------------
>
>                 Key: KAFKA-7224
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7224
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Priority: Major
>
> As described in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables]
> Following on KAFKA-7223, implement the spill-to-disk buffering strategy.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to