Hi Sachin, is your goal to prevent any records with a duplicate key from
ever getting sent downstream?  The KTable you have in your example will of
course have the most recent record for a given key, but it will still emit
updates.  So if key "A" arrives a second time (with no change to the
value), it will still emitted. (depending on how rapidly you get duplicate
events, some might get removed by internal caching but you will still
likely get at least 1 of those duplicates sent further downstream through
the topology)  Take a look at this example from Confluent to see if it
would work for your case:
https://github.com/confluentinc/kafka-streams-examples/blob/5.3.1-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java.


Also, what is the reason for wanting to eliminate duplicates?  Do you have
downstream aggregators that you don't want to incorrectly count duplicated
events?

Alex

On Tue, Dec 10, 2019 at 7:05 AM Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> I am using streams and I get messages like: (K, V)
> (A, a), (B, b), (C, c), (A, a), (C, c), (A, a) .....
> I wanted to define a topology which would filter out duplicate messages
> from upstream.
>
> I want to know if this is possible?
> The code I have written to do this is something like this:
>
> source.groupBy((k, v) -> new Key(k, v))
>       .reduce((av, nv) -> nv)
>       .toStream()
>
> So basically I create a new key which is combination of existing (k,v).
> Then I group by it and reduce it to a table to just store the final value.
> Finally I convert that to a stream to be used downstream.
>
> My question is is that would this logic work?
> Like if I get another message (A, a) it will basically replace the existing
> (A, a) in the table and no new message would get appended to the resulting
> stream.
>
> Is my understanding correct?
>
> If not then is there any other way to achieve this?
>
> Thanks
> Sachin
>

Reply via email to