[jira] [Commented] (KAFKA-10475) Using same key reports different count of records for groupBy() and groupByKey() in Kafka Streaming Application

2021-08-13 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17398781#comment-17398781
 ] 

Matthias J. Sax commented on KAFKA-10475:
-

In the end `groupBy(...)` and `groupByKey()` are not 100% equivalent, but 
`groupBy()` is equivalent to `selectKey(...).groupByKey()`.

The difference between `groupBy()` and `groupByKey()` (assuming that you 
effectively not changing the key) is still the repartition topic. Thus, data 
might actually be partitioned differently in the repartition topic than in the 
input topic, as we don't know how data is partitioned in the input topic. Did 
you verify that the partitioning did not change? Also, if you rerun the program 
multiple times, does you always lose the same record and can you identify them 
individually?

> Using same key reports different count of records for groupBy() and 
> groupByKey() in Kafka Streaming Application
> ---
>
> Key: KAFKA-10475
> URL: https://issues.apache.org/jira/browse/KAFKA-10475
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
> Environment: Kafka Cluster:
> Kafka Version: kafka_2.12-2.6.0/
> openjdk version "1.8.0_265"
> Kafka Streams:
> Kafka Streams Version: 2.3.0
> openjdk version "11.0.8"
>Reporter: Saad Rasool
>Assignee: Divya Guduru
>Priority: Major
>
>  
> We are experiencing what amounts to “lost packets” in our stream processing 
> when we use custom groupByKey() values. We have a single processor node, with 
> a source topic from which we read packets, do a grouping and aggregation on 
> that group, and output based on a computation that requires access to a 
> statestore.
>  
> Let me give greater details of the problem and how we have tried to 
> understand it until now, below:
> *Overview* We are setting up a Kafka Streams application in which we have to 
> perform windowed operations. We are grouping devices based on a specific key. 
> Following are the sample columns we are using for GroupBy:
>  
> ||Field Name ||Field Value||
> |A|12|
> |B|abc|
> |C|x13|
>  
> Sample Key based on the above data: 12abcx13 where key = Field (A) + Field 
> (B) + Field (C)
> *Problem* Getting a different count of records in two scenarios against the 
> same key When specifying the key ourselves using groupBy() Using groupByKey() 
> to group the data on the ‘Input Kafka Topic’ partitioning key.
> *Description* We were first using the groupBy() function of Kafka streams to 
> group the devices using the key above. In this case, the streams application 
> dropped several records and produced less number of records than expected. 
> However, when we did not specify our own custom grouping using the groupBy() 
> function, and instead used groupByKey() to key the data on the original 
> incoming Kafka partition key, we got the exact number of records which were 
> expected.
> To check that we were using the exact same keys as the input topic for our 
> custom groupBy() function we compared both Keys within the code. The Input 
> topic key and the custom key were exactly the same.
> So now we have come to the conclusion that there is some internal 
> functionality of the groupBy function that we are not able to understand 
> because of which the groupBy function and the groupByKey function both report 
> different counts for the same key. We have searched multiple forums but are 
> unable to understand the reason for this phenomenon.
> *Code Snippet:*
> With groupBykey()
>   
> {code:java}
> KStream myStream = this.stream
> .groupByKey() 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
>  .reduce((value1, value2) -> value2) 
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) 
> .toStream() .transform(new myTransformer(this.store.name(), 
> this.store.name());{code}
>  
>   
> With groupBy():
>   
> {code:java}
> KStream myStream = this.stream
> .groupBy((key, value) -> value.A + value.B + value.C, 
> Grouped.with(Serdes.String(), SerdesCollection.getInputSerdes())) 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
>  .reduce((value1, value2) -> value2) 
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) 
> .toStream() .transform(new myTransformer(this.store.name()), 
> this.store.name());{code}
>  
>   
> ||*Kafka Cluster Setup*|| ||
> |Number of Nodes|       3|
> |CPU Cores|       2|
> |RAM|     8 Gb|
>  
> ||*Streaming Application Setup*||Version||
> |       {{Kafka Streams Version }}| {{2.3.0}}|
> |          openjdk version| 11.0.8|



--
This 

[jira] [Commented] (KAFKA-10475) Using same key reports different count of records for groupBy() and groupByKey() in Kafka Streaming Application

2020-09-22 Thread Saad Rasool (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17199858#comment-17199858
 ] 

Saad Rasool commented on KAFKA-10475:
-

[~bchen225242] I will get back to you with sample data by the end of this week.

> Using same key reports different count of records for groupBy() and 
> groupByKey() in Kafka Streaming Application
> ---
>
> Key: KAFKA-10475
> URL: https://issues.apache.org/jira/browse/KAFKA-10475
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
> Environment: Kafka Cluster:
> Kafka Version: kafka_2.12-2.6.0/
> openjdk version "1.8.0_265"
> Kafka Streams:
> Kafka Streams Version: 2.3.0
> openjdk version "11.0.8"
>Reporter: Saad Rasool
>Assignee: Divya Guduru
>Priority: Major
>
>  
> We are experiencing what amounts to “lost packets” in our stream processing 
> when we use custom groupByKey() values. We have a single processor node, with 
> a source topic from which we read packets, do a grouping and aggregation on 
> that group, and output based on a computation that requires access to a 
> statestore.
>  
> Let me give greater details of the problem and how we have tried to 
> understand it until now, below:
> *Overview* We are setting up a Kafka Streams application in which we have to 
> perform windowed operations. We are grouping devices based on a specific key. 
> Following are the sample columns we are using for GroupBy:
>  
> ||Field Name ||Field Value||
> |A|12|
> |B|abc|
> |C|x13|
>  
> Sample Key based on the above data: 12abcx13 where key = Field (A) + Field 
> (B) + Field (C)
> *Problem* Getting a different count of records in two scenarios against the 
> same key When specifying the key ourselves using groupBy() Using groupByKey() 
> to group the data on the ‘Input Kafka Topic’ partitioning key.
> *Description* We were first using the groupBy() function of Kafka streams to 
> group the devices using the key above. In this case, the streams application 
> dropped several records and produced less number of records than expected. 
> However, when we did not specify our own custom grouping using the groupBy() 
> function, and instead used groupByKey() to key the data on the original 
> incoming Kafka partition key, we got the exact number of records which were 
> expected.
> To check that we were using the exact same keys as the input topic for our 
> custom groupBy() function we compared both Keys within the code. The Input 
> topic key and the custom key were exactly the same.
> So now we have come to the conclusion that there is some internal 
> functionality of the groupBy function that we are not able to understand 
> because of which the groupBy function and the groupByKey function both report 
> different counts for the same key. We have searched multiple forums but are 
> unable to understand the reason for this phenomenon.
> *Code Snippet:*
> With groupBykey()
>   
> {code:java}
> KStream myStream = this.stream
> .groupByKey() 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
>  .reduce((value1, value2) -> value2) 
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) 
> .toStream() .transform(new myTransformer(this.store.name(), 
> this.store.name());{code}
>  
>   
> With groupBy():
>   
> {code:java}
> KStream myStream = this.stream
> .groupBy((key, value) -> value.A + value.B + value.C, 
> Grouped.with(Serdes.String(), SerdesCollection.getInputSerdes())) 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
>  .reduce((value1, value2) -> value2) 
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) 
> .toStream() .transform(new myTransformer(this.store.name()), 
> this.store.name());{code}
>  
>   
> ||*Kafka Cluster Setup*|| ||
> |Number of Nodes|       3|
> |CPU Cores|       2|
> |RAM|     8 Gb|
>  
> ||*Streaming Application Setup*||Version||
> |       {{Kafka Streams Version }}| {{2.3.0}}|
> |          openjdk version| 11.0.8|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10475) Using same key reports different count of records for groupBy() and groupByKey() in Kafka Streaming Application

2020-09-21 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17199689#comment-17199689
 ] 

Boyang Chen commented on KAFKA-10475:
-

[~saad-rasool] [~guozhang]I don't think we have enough information to reproduce 
this issue, could you give us a sample setup (the application code, the input 
data, and expected output) so that we could verify there is indeed a problem in 
groupByKey?

> Using same key reports different count of records for groupBy() and 
> groupByKey() in Kafka Streaming Application
> ---
>
> Key: KAFKA-10475
> URL: https://issues.apache.org/jira/browse/KAFKA-10475
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
> Environment: Kafka Cluster:
> Kafka Version: kafka_2.12-2.6.0/
> openjdk version "1.8.0_265"
> Kafka Streams:
> Kafka Streams Version: 2.3.0
> openjdk version "11.0.8"
>Reporter: Saad Rasool
>Assignee: Divya Guduru
>Priority: Major
>
>  
> We are experiencing what amounts to “lost packets” in our stream processing 
> when we use custom groupByKey() values. We have a single processor node, with 
> a source topic from which we read packets, do a grouping and aggregation on 
> that group, and output based on a computation that requires access to a 
> statestore.
>  
> Let me give greater details of the problem and how we have tried to 
> understand it until now, below:
> *Overview* We are setting up a Kafka Streams application in which we have to 
> perform windowed operations. We are grouping devices based on a specific key. 
> Following are the sample columns we are using for GroupBy:
>  
> ||Field Name ||Field Value||
> |A|12|
> |B|abc|
> |C|x13|
>  
> Sample Key based on the above data: 12abcx13 where key = Field (A) + Field 
> (B) + Field (C)
> *Problem* Getting a different count of records in two scenarios against the 
> same key When specifying the key ourselves using groupBy() Using groupByKey() 
> to group the data on the ‘Input Kafka Topic’ partitioning key.
> *Description* We were first using the groupBy() function of Kafka streams to 
> group the devices using the key above. In this case, the streams application 
> dropped several records and produced less number of records than expected. 
> However, when we did not specify our own custom grouping using the groupBy() 
> function, and instead used groupByKey() to key the data on the original 
> incoming Kafka partition key, we got the exact number of records which were 
> expected.
> To check that we were using the exact same keys as the input topic for our 
> custom groupBy() function we compared both Keys within the code. The Input 
> topic key and the custom key were exactly the same.
> So now we have come to the conclusion that there is some internal 
> functionality of the groupBy function that we are not able to understand 
> because of which the groupBy function and the groupByKey function both report 
> different counts for the same key. We have searched multiple forums but are 
> unable to understand the reason for this phenomenon.
> *Code Snippet:*
> With groupBykey()
>   
> {code:java}
> KStream myStream = this.stream
> .groupByKey() 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
>  .reduce((value1, value2) -> value2) 
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) 
> .toStream() .transform(new myTransformer(this.store.name(), 
> this.store.name());{code}
>  
>   
> With groupBy():
>   
> {code:java}
> KStream myStream = this.stream
> .groupBy((key, value) -> value.A + value.B + value.C, 
> Grouped.with(Serdes.String(), SerdesCollection.getInputSerdes())) 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
>  .reduce((value1, value2) -> value2) 
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) 
> .toStream() .transform(new myTransformer(this.store.name()), 
> this.store.name());{code}
>  
>   
> ||*Kafka Cluster Setup*|| ||
> |Number of Nodes|       3|
> |CPU Cores|       2|
> |RAM|     8 Gb|
>  
> ||*Streaming Application Setup*||Version||
> |       {{Kafka Streams Version }}| {{2.3.0}}|
> |          openjdk version| 11.0.8|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10475) Using same key reports different count of records for groupBy() and groupByKey() in Kafka Streaming Application

2020-09-15 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17196462#comment-17196462
 ] 

Guozhang Wang commented on KAFKA-10475:
---

[~bchen225242] could you also help looking into whether this is a real bug of 
Streams and then if yes guide [~Divya Guduru] through the PR review and merging 
process?

> Using same key reports different count of records for groupBy() and 
> groupByKey() in Kafka Streaming Application
> ---
>
> Key: KAFKA-10475
> URL: https://issues.apache.org/jira/browse/KAFKA-10475
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
> Environment: Kafka Cluster:
> Kafka Version: kafka_2.12-2.6.0/
> openjdk version "1.8.0_265"
> Kafka Streams:
> Kafka Streams Version: 2.3.0
> openjdk version "11.0.8"
>Reporter: Saad Rasool
>Assignee: Divya Guduru
>Priority: Major
>
>  
> We are experiencing what amounts to “lost packets” in our stream processing 
> when we use custom groupByKey() values. We have a single processor node, with 
> a source topic from which we read packets, do a grouping and aggregation on 
> that group, and output based on a computation that requires access to a 
> statestore.
>  
> Let me give greater details of the problem and how we have tried to 
> understand it until now, below:
> *Overview* We are setting up a Kafka Streams application in which we have to 
> perform windowed operations. We are grouping devices based on a specific key. 
> Following are the sample columns we are using for GroupBy:
>  
> ||Field Name ||Field Value||
> |A|12|
> |B|abc|
> |C|x13|
>  
> Sample Key based on the above data: 12abcx13 where key = Field (A) + Field 
> (B) + Field (C)
> *Problem* Getting a different count of records in two scenarios against the 
> same key When specifying the key ourselves using groupBy() Using groupByKey() 
> to group the data on the ‘Input Kafka Topic’ partitioning key.
> *Description* We were first using the groupBy() function of Kafka streams to 
> group the devices using the key above. In this case, the streams application 
> dropped several records and produced less number of records than expected. 
> However, when we did not specify our own custom grouping using the groupBy() 
> function, and instead used groupByKey() to key the data on the original 
> incoming Kafka partition key, we got the exact number of records which were 
> expected.
> To check that we were using the exact same keys as the input topic for our 
> custom groupBy() function we compared both Keys within the code. The Input 
> topic key and the custom key were exactly the same.
> So now we have come to the conclusion that there is some internal 
> functionality of the groupBy function that we are not able to understand 
> because of which the groupBy function and the groupByKey function both report 
> different counts for the same key. We have searched multiple forums but are 
> unable to understand the reason for this phenomenon.
> *Code Snippet:*
> With groupBykey()
>   
> {code:java}
> KStream myStream = this.stream
> .groupByKey() 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
>  .reduce((value1, value2) -> value2) 
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) 
> .toStream() .transform(new myTransformer(this.store.name(), 
> this.store.name());{code}
>  
>   
> With groupBy():
>   
> {code:java}
> KStream myStream = this.stream
> .groupBy((key, value) -> value.A + value.B + value.C, 
> Grouped.with(Serdes.String(), SerdesCollection.getInputSerdes())) 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
>  .reduce((value1, value2) -> value2) 
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) 
> .toStream() .transform(new myTransformer(this.store.name()), 
> this.store.name());{code}
>  
>   
> ||*Kafka Cluster Setup*|| ||
> |Number of Nodes|       3|
> |CPU Cores|       2|
> |RAM|     8 Gb|
>  
> ||*Streaming Application Setup*||Version||
> |       {{Kafka Streams Version }}| {{2.3.0}}|
> |          openjdk version| 11.0.8|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10475) Using same key reports different count of records for groupBy() and groupByKey() in Kafka Streaming Application

2020-09-13 Thread Divya Guduru (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17195112#comment-17195112
 ] 

Divya Guduru commented on KAFKA-10475:
--

I would like to work on this issue.. can someone update my name under assignee

> Using same key reports different count of records for groupBy() and 
> groupByKey() in Kafka Streaming Application
> ---
>
> Key: KAFKA-10475
> URL: https://issues.apache.org/jira/browse/KAFKA-10475
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
> Environment: Kafka Cluster:
> Kafka Version: kafka_2.12-2.6.0/
> openjdk version "1.8.0_265"
> Kafka Streams:
> Kafka Streams Version: 2.3.0
> openjdk version "11.0.8"
>Reporter: Saad Rasool
>Priority: Major
>
>  
> We are experiencing what amounts to “lost packets” in our stream processing 
> when we use custom groupByKey() values. We have a single processor node, with 
> a source topic from which we read packets, do a grouping and aggregation on 
> that group, and output based on a computation that requires access to a 
> statestore.
>  
> Let me give greater details of the problem and how we have tried to 
> understand it until now, below:
> *Overview* We are setting up a Kafka Streams application in which we have to 
> perform windowed operations. We are grouping devices based on a specific key. 
> Following are the sample columns we are using for GroupBy:
>  
> ||Field Name ||Field Value||
> |A|12|
> |B|abc|
> |C|x13|
>  
> Sample Key based on the above data: 12abcx13 where key = Field (A) + Field 
> (B) + Field (C)
> *Problem* Getting a different count of records in two scenarios against the 
> same key When specifying the key ourselves using groupBy() Using groupByKey() 
> to group the data on the ‘Input Kafka Topic’ partitioning key.
> *Description* We were first using the groupBy() function of Kafka streams to 
> group the devices using the key above. In this case, the streams application 
> dropped several records and produced less number of records than expected. 
> However, when we did not specify our own custom grouping using the groupBy() 
> function, and instead used groupByKey() to key the data on the original 
> incoming Kafka partition key, we got the exact number of records which were 
> expected.
> To check that we were using the exact same keys as the input topic for our 
> custom groupBy() function we compared both Keys within the code. The Input 
> topic key and the custom key were exactly the same.
> So now we have come to the conclusion that there is some internal 
> functionality of the groupBy function that we are not able to understand 
> because of which the groupBy function and the groupByKey function both report 
> different counts for the same key. We have searched multiple forums but are 
> unable to understand the reason for this phenomenon.
> *Code Snippet:*
> With groupBykey()
>   
> {code:java}
> KStream myStream = this.stream
> .groupByKey() 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
>  .reduce((value1, value2) -> value2) 
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) 
> .toStream() .transform(new myTransformer(this.store.name(), 
> this.store.name());{code}
>  
>   
> With groupBy():
>   
> {code:java}
> KStream myStream = this.stream
> .groupBy((key, value) -> value.A + value.B + value.C, 
> Grouped.with(Serdes.String(), SerdesCollection.getInputSerdes())) 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
>  .reduce((value1, value2) -> value2) 
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) 
> .toStream() .transform(new myTransformer(this.store.name()), 
> this.store.name());{code}
>  
>   
> ||*Kafka Cluster Setup*|| ||
> |Number of Nodes|       3|
> |CPU Cores|       2|
> |RAM|     8 Gb|
>  
> ||*Streaming Application Setup*||Version||
> |       {{Kafka Streams Version }}| {{2.3.0}}|
> |          openjdk version| 11.0.8|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)