Hi Alex,
Thanks for the quick response.
What I have is around 8 streams branched from a single stream, that down
the line again gets joined into 1.
Now each branched stream can have duplicates and when joining all this data
I just have kind of endless tuples of data.

So what I was thinking what if I can actually remove all the duplicates
right at start then I will have manageable data to do the joins.

So I checked the code, and wanted to know how can this be inserted into
existing pipeline. Basically my current code was something like this:
Properties props = new Properties();
....
final StreamsBuilder builder = new StreamsBuilder();
final KStream<K, V> input = builder.stream("input-topic");
input.... //my pipeline starts
.....
final Topology topology = builder.build(props);
final KafkaStreams streams = new KafkaStreams(topology, props);
......
streams.start();

This will change to:
Properties props = new Properties();
....
final StoreBuilder<WindowStore<K, V>> dedupStoreBuilder = .....
.....
final KStream<K, V> input = builder.stream("input-topic");
final KStream<K, V> deduplicated = input.transform(() -> new
DeduplicationTransformer<>(windowSize.toMillis(), (key, value) -> value));
deduplicated.... //my existing pipeline
..... //rest same as before
streams.start();

Let me know if I got this right.

Thanks
Sachin


On Tue, Dec 10, 2019 at 6:59 PM Alex Brekken <brek...@gmail.com> wrote:

> Hi Sachin, is your goal to prevent any records with a duplicate key from
> ever getting sent downstream?  The KTable you have in your example will of
> course have the most recent record for a given key, but it will still emit
> updates.  So if key "A" arrives a second time (with no change to the
> value), it will still emitted. (depending on how rapidly you get duplicate
> events, some might get removed by internal caching but you will still
> likely get at least 1 of those duplicates sent further downstream through
> the topology)  Take a look at this example from Confluent to see if it
> would work for your case:
>
> https://github.com/confluentinc/kafka-streams-examples/blob/5.3.1-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java
> .
>
>
> Also, what is the reason for wanting to eliminate duplicates?  Do you have
> downstream aggregators that you don't want to incorrectly count duplicated
> events?
>
> Alex
>
> On Tue, Dec 10, 2019 at 7:05 AM Sachin Mittal <sjmit...@gmail.com> wrote:
>
> > Hi,
> > I am using streams and I get messages like: (K, V)
> > (A, a), (B, b), (C, c), (A, a), (C, c), (A, a) .....
> > I wanted to define a topology which would filter out duplicate messages
> > from upstream.
> >
> > I want to know if this is possible?
> > The code I have written to do this is something like this:
> >
> > source.groupBy((k, v) -> new Key(k, v))
> >       .reduce((av, nv) -> nv)
> >       .toStream()
> >
> > So basically I create a new key which is combination of existing (k,v).
> > Then I group by it and reduce it to a table to just store the final
> value.
> > Finally I convert that to a stream to be used downstream.
> >
> > My question is is that would this logic work?
> > Like if I get another message (A, a) it will basically replace the
> existing
> > (A, a) in the table and no new message would get appended to the
> resulting
> > stream.
> >
> > Is my understanding correct?
> >
> > If not then is there any other way to achieve this?
> >
> > Thanks
> > Sachin
> >
>

Reply via email to