Hello Guozhang and Adam,

Regarding Guozhang's proposal please see recent discussions about
`transformValues()` and returning `null` from the transformer:
https://issues.apache.org/jira/browse/KAFKA-9533?focusedCommentId=17044602&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17044602.

With the current behavior, the commands should be:

`stream.transformValues(...).filter((k,v) -> return v !=
null).groupByKey().aggregate()`

Best,
Bruno

On Tue, Feb 25, 2020 at 2:58 AM Guozhang Wang <wangg...@gmail.com> wrote:
>
> Hello Adam,
>
> It seems your intention is to not "avoid emitting if the new aggregation
> result is the same as the old aggregation" but to "avoid processing the
> aggregation at all if it state is already some certain value", right?
>
> In this case I think you can try sth. like this:
>
> *stream.transformValues().groupByKey().aggregate()*
>
> where transformValues is just used as a slight complicated "filter"
> operation, in which you can access the state store that "aggregate" is
> connected to, and read / check if the corresponding entry is already
> `success`, if yes let `transformValue` to return `null` which means forward
> nothing to the downstream.
>
> The reason to use transformValues instead of transform is to make sure you
> do not introduce unnecessary repartitioning here.
>
> Guozhang
>
>
>
> On Mon, Feb 24, 2020 at 2:01 PM Adam Rinehart <adam.rineh...@gmail.com>
> wrote:
>
> > So I am trying to process incoming events, that may or may not actually
> > update the state of my output object. Originally I was doing this with a
> > KStream/KTable join, until I saw the discussion about "KTable in Compact
> > Topic takes too long to be updated", when I switched to
> > groupByKey().aggregate().
> >
> > Some events may not result in a state change. For example, once I have an
> > incoming success event, I emit a success output and future incoming failure
> > events will be ignored.
> >
> > My intention is to only emit a record from the aggregate KTable if the
> > aggregate record actually changed. But I can't figure out how to do that
> > within the aggregator interface. I've tried returning the incoming
> > aggregate object when nothing changes, but I still get a record emitted
> > from the table.
> >
>
>
> --
> -- Guozhang

Reply via email to