Hello Adam, It seems your intention is to not "avoid emitting if the new aggregation result is the same as the old aggregation" but to "avoid processing the aggregation at all if it state is already some certain value", right?
In this case I think you can try sth. like this: *stream.transformValues().groupByKey().aggregate()* where transformValues is just used as a slight complicated "filter" operation, in which you can access the state store that "aggregate" is connected to, and read / check if the corresponding entry is already `success`, if yes let `transformValue` to return `null` which means forward nothing to the downstream. The reason to use transformValues instead of transform is to make sure you do not introduce unnecessary repartitioning here. Guozhang On Mon, Feb 24, 2020 at 2:01 PM Adam Rinehart <adam.rineh...@gmail.com> wrote: > So I am trying to process incoming events, that may or may not actually > update the state of my output object. Originally I was doing this with a > KStream/KTable join, until I saw the discussion about "KTable in Compact > Topic takes too long to be updated", when I switched to > groupByKey().aggregate(). > > Some events may not result in a state change. For example, once I have an > incoming success event, I emit a success output and future incoming failure > events will be ignored. > > My intention is to only emit a record from the aggregate KTable if the > aggregate record actually changed. But I can't figure out how to do that > within the aggregator interface. I've tried returning the incoming > aggregate object when nothing changes, but I still get a record emitted > from the table. > -- -- Guozhang