Hi,
This is really getting interesting.
Now if we don't want a record to be emitted downstream only way we can do
is via transform or (flatTransform).

Since we are now reverting the fix for null record in transformValues and
rather change the docs, doesn't this add bit of confusion for users.
Confluent docs says that:
transformValues is preferable to transform because it will not cause data
re-partitioning.

So in many cases if just the record's value structure is sufficient to
determine whether we should emit it downstream or not, we would still be
forced to
use transform and unnecessarily cause data re-partitioning. Won't this be
in-efficient.

Thanks
Sachin



On Tue, Feb 25, 2020 at 10:52 PM Bruno Cadonna <br...@confluent.io> wrote:

> Hello Guozhang and Adam,
>
> Regarding Guozhang's proposal please see recent discussions about
> `transformValues()` and returning `null` from the transformer:
>
> https://issues.apache.org/jira/browse/KAFKA-9533?focusedCommentId=17044602&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17044602
> .
>
> With the current behavior, the commands should be:
>
> `stream.transformValues(...).filter((k,v) -> return v !=
> null).groupByKey().aggregate()`
>
> Best,
> Bruno
>
> On Tue, Feb 25, 2020 at 2:58 AM Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > Hello Adam,
> >
> > It seems your intention is to not "avoid emitting if the new aggregation
> > result is the same as the old aggregation" but to "avoid processing the
> > aggregation at all if it state is already some certain value", right?
> >
> > In this case I think you can try sth. like this:
> >
> > *stream.transformValues().groupByKey().aggregate()*
> >
> > where transformValues is just used as a slight complicated "filter"
> > operation, in which you can access the state store that "aggregate" is
> > connected to, and read / check if the corresponding entry is already
> > `success`, if yes let `transformValue` to return `null` which means
> forward
> > nothing to the downstream.
> >
> > The reason to use transformValues instead of transform is to make sure
> you
> > do not introduce unnecessary repartitioning here.
> >
> > Guozhang
> >
> >
> >
> > On Mon, Feb 24, 2020 at 2:01 PM Adam Rinehart <adam.rineh...@gmail.com>
> > wrote:
> >
> > > So I am trying to process incoming events, that may or may not actually
> > > update the state of my output object. Originally I was doing this with
> a
> > > KStream/KTable join, until I saw the discussion about "KTable in
> Compact
> > > Topic takes too long to be updated", when I switched to
> > > groupByKey().aggregate().
> > >
> > > Some events may not result in a state change. For example, once I have
> an
> > > incoming success event, I emit a success output and future incoming
> failure
> > > events will be ignored.
> > >
> > > My intention is to only emit a record from the aggregate KTable if the
> > > aggregate record actually changed. But I can't figure out how to do
> that
> > > within the aggregator interface. I've tried returning the incoming
> > > aggregate object when nothing changes, but I still get a record emitted
> > > from the table.
> > >
> >
> >
> > --
> > -- Guozhang
>

Reply via email to