[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable

2018-11-16 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689838#comment-16689838
 ] 

Matthias J. Sax commented on KAFKA-7595:


Glad it works. Thanks for confirmation!

> Kafka Streams: KTrable to KTable join introduces duplicates in downstream 
> KTable
> 
>
> Key: KAFKA-7595
> URL: https://issues.apache.org/jira/browse/KAFKA-7595
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Vik Gamov
>Priority: Major
>
> When perform KTable to KTable join after aggregation, there are duplicates in 
> resulted KTable.
> 1. caching disabled, no materialized => duplicates
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}
> {{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue());}}
> 2. caching disabled, materialized => duplicate
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
> 3. caching enabled, materiazlized => all good
> {{// Enable record cache of size 10 MB.}}
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 
> * 1024 * 1024L);}}
> {{// Set commit interval to 1 second.}}
> {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
> 1000);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
>  
> Demo app 
> [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable

2018-11-15 Thread Vik Gamov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688927#comment-16688927
 ] 

Vik Gamov commented on KAFKA-7595:
--

[~mjsax] [~vvcephei] 

Thank you for input. I was able to rewrite my app according to Matthias 
recommendations

[https://github.com/gAmUssA/streams-movie-demo/blob/refactoring/1/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L165]
 

 

> Kafka Streams: KTrable to KTable join introduces duplicates in downstream 
> KTable
> 
>
> Key: KAFKA-7595
> URL: https://issues.apache.org/jira/browse/KAFKA-7595
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Vik Gamov
>Priority: Major
>
> When perform KTable to KTable join after aggregation, there are duplicates in 
> resulted KTable.
> 1. caching disabled, no materialized => duplicates
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}
> {{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue());}}
> 2. caching disabled, materialized => duplicate
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
> 3. caching enabled, materiazlized => all good
> {{// Enable record cache of size 10 MB.}}
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 
> * 1024 * 1024L);}}
> {{// Set commit interval to 1 second.}}
> {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
> 1000);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
>  
> Demo app 
> [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable

2018-11-06 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677173#comment-16677173
 ] 

Matthias J. Sax commented on KAFKA-7595:


Just read [~vvcephei] comments. This is not a bug but expected behavior. We 
have an example in the wiki about how to compute average correctly: 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average?]

 

> Kafka Streams: KTrable to KTable join introduces duplicates in downstream 
> KTable
> 
>
> Key: KAFKA-7595
> URL: https://issues.apache.org/jira/browse/KAFKA-7595
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Vik Gamov
>Priority: Major
>
> When perform KTable to KTable join after aggregation, there are duplicates in 
> resulted KTable.
> 1. caching disabled, no materialized => duplicates
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}
> {{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue());}}
> 2. caching disabled, materialized => duplicate
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
> 3. caching enabled, materiazlized => all good
> {{// Enable record cache of size 10 MB.}}
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 
> * 1024 * 1024L);}}
> {{// Set commit interval to 1 second.}}
> {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
> 1000);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
>  
> Demo app 
> [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable

2018-11-06 Thread Damian Guy (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676728#comment-16676728
 ] 

Damian Guy commented on KAFKA-7595:
---

[~vvcephei] your reasoning seems valid to me

> Kafka Streams: KTrable to KTable join introduces duplicates in downstream 
> KTable
> 
>
> Key: KAFKA-7595
> URL: https://issues.apache.org/jira/browse/KAFKA-7595
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Vik Gamov
>Priority: Major
>
> When perform KTable to KTable join after aggregation, there are duplicates in 
> resulted KTable.
> 1. caching disabled, no materialized => duplicates
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}
> {{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue());}}
> 2. caching disabled, materialized => duplicate
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
> 3. caching enabled, materiazlized => all good
> {{// Enable record cache of size 10 MB.}}
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 
> * 1024 * 1024L);}}
> {{// Set commit interval to 1 second.}}
> {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
> 1000);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
>  
> Demo app 
> [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable

2018-11-05 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675920#comment-16675920
 ] 

John Roesler commented on KAFKA-7595:
-

[~damianguy], Can you confirm my reasoning above? I don't want to confuse the 
issue with my own misunderstanding...

> Kafka Streams: KTrable to KTable join introduces duplicates in downstream 
> KTable
> 
>
> Key: KAFKA-7595
> URL: https://issues.apache.org/jira/browse/KAFKA-7595
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Vik Gamov
>Priority: Major
>
> When perform KTable to KTable join after aggregation, there are duplicates in 
> resulted KTable.
> 1. caching disabled, no materialized => duplicates
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}
> {{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue());}}
> 2. caching disabled, materialized => duplicate
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
> 3. caching enabled, materiazlized => all good
> {{// Enable record cache of size 10 MB.}}
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 
> * 1024 * 1024L);}}
> {{// Set commit interval to 1 second.}}
> {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
> 1000);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
>  
> Demo app 
> [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable

2018-11-05 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675891#comment-16675891
 ] 

John Roesler commented on KAFKA-7595:
-

Not sure if I'm thinking about this properly, but...

For example you get some event  on the left and  on the right, 
and you want to produce . Let's say you get the left event first.
 # With *caching disabled*, *whether or not the join is materialized*, I'd 
expect to see  followed by . It's not clear to me 
if you call this a "duplicate".
 # With *caching enabled* and the *join not materialized*, I'd expect to see 
duplicates:  followed by .
 ** This could happen if  gets cached on the left and  gets 
cached on the right. They only trigger the join upon flush, and they trigger it 
independently, so when the left trigger happens, the right value is already 
visible and vice-versa, hence the duplicates.
 ** Even though the caches try to de-duplicate events, they are mutually 
oblivious (they don't know their results will later be used in a join, and they 
each don't know the other exists), so they can't cooperate to de-duplicate the 
results.
 # If the *join is materialized AND caching is enabled*, then there are still 
"technically" duplicate events, but they get de-duplicated by the join's cache.

*So this this in mind, can you clarify which of these you see contradicted?* 

*In particular, I wasn't sure if you are calling the sequence ", 
" a "duplicate" or not. I _think_ this result would be expected 
for your cases 1 and 2.*

 

And just a plug for this new feature, if you'd like to achieve de-duplication 
of either sequence ", " or ", " without having to materialize the join, as of 2.1, you can use the new 
"suppress" operator: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables

> Kafka Streams: KTrable to KTable join introduces duplicates in downstream 
> KTable
> 
>
> Key: KAFKA-7595
> URL: https://issues.apache.org/jira/browse/KAFKA-7595
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Vik Gamov
>Priority: Major
>
> When perform KTable to KTable join after aggregation, there are duplicates in 
> resulted KTable.
> 1. caching disabled, no materialized => duplicates
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}
> {{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue());}}
> 2. caching disabled, materialized => duplicate
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
> 0);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
> 3. caching enabled, materiazlized => all good
> {{// Enable record cache of size 10 MB.}}
> {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 
> * 1024 * 1024L);}}
> {{// Set commit interval to 1 second.}}
> {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
> 1000);}}{{KTable ratingCounts = ratingsById.count();}}
> {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}}
> {{KTable ratingAverage = ratingSums.join(ratingCounts,}}
> {{ (sum, count) -> sum / count.doubleValue(),}}
> {{ Materialized.as("average-ratings"));}}
>  
> Demo app 
> [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)