I've never used that dedup transformer before, but what you've got looks right. (though if there's a way to hash your message value, or somehow get a guid out of it that might be preferable) As you probably noticed it's state is Windowed - so if your use-case depends on being able to remove duplicate events over a large (or infinite) time-span, than this solution probably isn't a good fit.
Alex On Tue, Dec 10, 2019 at 8:40 AM Sachin Mittal <sjmit...@gmail.com> wrote: > Hi Alex, > Thanks for the quick response. > What I have is around 8 streams branched from a single stream, that down > the line again gets joined into 1. > Now each branched stream can have duplicates and when joining all this data > I just have kind of endless tuples of data. > > So what I was thinking what if I can actually remove all the duplicates > right at start then I will have manageable data to do the joins. > > So I checked the code, and wanted to know how can this be inserted into > existing pipeline. Basically my current code was something like this: > Properties props = new Properties(); > .... > final StreamsBuilder builder = new StreamsBuilder(); > final KStream<K, V> input = builder.stream("input-topic"); > input.... //my pipeline starts > ..... > final Topology topology = builder.build(props); > final KafkaStreams streams = new KafkaStreams(topology, props); > ...... > streams.start(); > > This will change to: > Properties props = new Properties(); > .... > final StoreBuilder<WindowStore<K, V>> dedupStoreBuilder = ..... > ..... > final KStream<K, V> input = builder.stream("input-topic"); > final KStream<K, V> deduplicated = input.transform(() -> new > DeduplicationTransformer<>(windowSize.toMillis(), (key, value) -> value)); > deduplicated.... //my existing pipeline > ..... //rest same as before > streams.start(); > > Let me know if I got this right. > > Thanks > Sachin > > > On Tue, Dec 10, 2019 at 6:59 PM Alex Brekken <brek...@gmail.com> wrote: > > > Hi Sachin, is your goal to prevent any records with a duplicate key from > > ever getting sent downstream? The KTable you have in your example will > of > > course have the most recent record for a given key, but it will still > emit > > updates. So if key "A" arrives a second time (with no change to the > > value), it will still emitted. (depending on how rapidly you get > duplicate > > events, some might get removed by internal caching but you will still > > likely get at least 1 of those duplicates sent further downstream through > > the topology) Take a look at this example from Confluent to see if it > > would work for your case: > > > > > https://github.com/confluentinc/kafka-streams-examples/blob/5.3.1-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java > > . > > > > > > Also, what is the reason for wanting to eliminate duplicates? Do you > have > > downstream aggregators that you don't want to incorrectly count > duplicated > > events? > > > > Alex > > > > On Tue, Dec 10, 2019 at 7:05 AM Sachin Mittal <sjmit...@gmail.com> > wrote: > > > > > Hi, > > > I am using streams and I get messages like: (K, V) > > > (A, a), (B, b), (C, c), (A, a), (C, c), (A, a) ..... > > > I wanted to define a topology which would filter out duplicate messages > > > from upstream. > > > > > > I want to know if this is possible? > > > The code I have written to do this is something like this: > > > > > > source.groupBy((k, v) -> new Key(k, v)) > > > .reduce((av, nv) -> nv) > > > .toStream() > > > > > > So basically I create a new key which is combination of existing (k,v). > > > Then I group by it and reduce it to a table to just store the final > > value. > > > Finally I convert that to a stream to be used downstream. > > > > > > My question is is that would this logic work? > > > Like if I get another message (A, a) it will basically replace the > > existing > > > (A, a) in the table and no new message would get appended to the > > resulting > > > stream. > > > > > > Is my understanding correct? > > > > > > If not then is there any other way to achieve this? > > > > > > Thanks > > > Sachin > > > > > >