[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable
[ https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689838#comment-16689838 ] Matthias J. Sax commented on KAFKA-7595: Glad it works. Thanks for confirmation! > Kafka Streams: KTrable to KTable join introduces duplicates in downstream > KTable > > > Key: KAFKA-7595 > URL: https://issues.apache.org/jira/browse/KAFKA-7595 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Vik Gamov >Priority: Major > > When perform KTable to KTable join after aggregation, there are duplicates in > resulted KTable. > 1. caching disabled, no materialized => duplicates > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}} > {{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue());}} > 2. caching disabled, materialized => duplicate > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > 3. caching enabled, materiazlized => all good > {{// Enable record cache of size 10 MB.}} > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 > * 1024 * 1024L);}} > {{// Set commit interval to 1 second.}} > {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > 1000);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > > Demo app > [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable
[ https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688927#comment-16688927 ] Vik Gamov commented on KAFKA-7595: -- [~mjsax] [~vvcephei] Thank you for input. I was able to rewrite my app according to Matthias recommendations [https://github.com/gAmUssA/streams-movie-demo/blob/refactoring/1/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L165] > Kafka Streams: KTrable to KTable join introduces duplicates in downstream > KTable > > > Key: KAFKA-7595 > URL: https://issues.apache.org/jira/browse/KAFKA-7595 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Vik Gamov >Priority: Major > > When perform KTable to KTable join after aggregation, there are duplicates in > resulted KTable. > 1. caching disabled, no materialized => duplicates > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}} > {{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue());}} > 2. caching disabled, materialized => duplicate > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > 3. caching enabled, materiazlized => all good > {{// Enable record cache of size 10 MB.}} > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 > * 1024 * 1024L);}} > {{// Set commit interval to 1 second.}} > {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > 1000);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > > Demo app > [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable
[ https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677173#comment-16677173 ] Matthias J. Sax commented on KAFKA-7595: Just read [~vvcephei] comments. This is not a bug but expected behavior. We have an example in the wiki about how to compute average correctly: [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average?] > Kafka Streams: KTrable to KTable join introduces duplicates in downstream > KTable > > > Key: KAFKA-7595 > URL: https://issues.apache.org/jira/browse/KAFKA-7595 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Vik Gamov >Priority: Major > > When perform KTable to KTable join after aggregation, there are duplicates in > resulted KTable. > 1. caching disabled, no materialized => duplicates > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}} > {{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue());}} > 2. caching disabled, materialized => duplicate > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > 3. caching enabled, materiazlized => all good > {{// Enable record cache of size 10 MB.}} > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 > * 1024 * 1024L);}} > {{// Set commit interval to 1 second.}} > {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > 1000);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > > Demo app > [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable
[ https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676728#comment-16676728 ] Damian Guy commented on KAFKA-7595: --- [~vvcephei] your reasoning seems valid to me > Kafka Streams: KTrable to KTable join introduces duplicates in downstream > KTable > > > Key: KAFKA-7595 > URL: https://issues.apache.org/jira/browse/KAFKA-7595 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Vik Gamov >Priority: Major > > When perform KTable to KTable join after aggregation, there are duplicates in > resulted KTable. > 1. caching disabled, no materialized => duplicates > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}} > {{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue());}} > 2. caching disabled, materialized => duplicate > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > 3. caching enabled, materiazlized => all good > {{// Enable record cache of size 10 MB.}} > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 > * 1024 * 1024L);}} > {{// Set commit interval to 1 second.}} > {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > 1000);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > > Demo app > [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable
[ https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675920#comment-16675920 ] John Roesler commented on KAFKA-7595: - [~damianguy], Can you confirm my reasoning above? I don't want to confuse the issue with my own misunderstanding... > Kafka Streams: KTrable to KTable join introduces duplicates in downstream > KTable > > > Key: KAFKA-7595 > URL: https://issues.apache.org/jira/browse/KAFKA-7595 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Vik Gamov >Priority: Major > > When perform KTable to KTable join after aggregation, there are duplicates in > resulted KTable. > 1. caching disabled, no materialized => duplicates > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}} > {{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue());}} > 2. caching disabled, materialized => duplicate > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > 3. caching enabled, materiazlized => all good > {{// Enable record cache of size 10 MB.}} > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 > * 1024 * 1024L);}} > {{// Set commit interval to 1 second.}} > {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > 1000);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > > Demo app > [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable
[ https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675891#comment-16675891 ] John Roesler commented on KAFKA-7595: - Not sure if I'm thinking about this properly, but... For example you get some event on the left and on the right, and you want to produce . Let's say you get the left event first. # With *caching disabled*, *whether or not the join is materialized*, I'd expect to see followed by . It's not clear to me if you call this a "duplicate". # With *caching enabled* and the *join not materialized*, I'd expect to see duplicates: followed by . ** This could happen if gets cached on the left and gets cached on the right. They only trigger the join upon flush, and they trigger it independently, so when the left trigger happens, the right value is already visible and vice-versa, hence the duplicates. ** Even though the caches try to de-duplicate events, they are mutually oblivious (they don't know their results will later be used in a join, and they each don't know the other exists), so they can't cooperate to de-duplicate the results. # If the *join is materialized AND caching is enabled*, then there are still "technically" duplicate events, but they get de-duplicated by the join's cache. *So this this in mind, can you clarify which of these you see contradicted?* *In particular, I wasn't sure if you are calling the sequence ", " a "duplicate" or not. I _think_ this result would be expected for your cases 1 and 2.* And just a plug for this new feature, if you'd like to achieve de-duplication of either sequence ", " or ", " without having to materialize the join, as of 2.1, you can use the new "suppress" operator: https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables > Kafka Streams: KTrable to KTable join introduces duplicates in downstream > KTable > > > Key: KAFKA-7595 > URL: https://issues.apache.org/jira/browse/KAFKA-7595 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Vik Gamov >Priority: Major > > When perform KTable to KTable join after aggregation, there are duplicates in > resulted KTable. > 1. caching disabled, no materialized => duplicates > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}} > {{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue());}} > 2. caching disabled, materialized => duplicate > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > 3. caching enabled, materiazlized => all good > {{// Enable record cache of size 10 MB.}} > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 > * 1024 * 1024L);}} > {{// Set commit interval to 1 second.}} > {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > 1000);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > > Demo app > [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)