[
https://issues.apache.org/jira/browse/KAFKA-10475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Divya Guduru reassigned KAFKA-10475:
------------------------------------
Assignee: Divya Guduru
> Using same key reports different count of records for groupBy() and
> groupByKey() in Kafka Streaming Application
> ---------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-10475
> URL: https://issues.apache.org/jira/browse/KAFKA-10475
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.3.0
> Environment: Kafka Cluster:
> Kafka Version: kafka_2.12-2.6.0/
> openjdk version "1.8.0_265"
> Kafka Streams:
> Kafka Streams Version: 2.3.0
> openjdk version "11.0.8"
> Reporter: Saad Rasool
> Assignee: Divya Guduru
> Priority: Major
>
>
> We are experiencing what amounts to “lost packets” in our stream processing
> when we use custom groupByKey() values. We have a single processor node, with
> a source topic from which we read packets, do a grouping and aggregation on
> that group, and output based on a computation that requires access to a
> statestore.
>
> Let me give greater details of the problem and how we have tried to
> understand it until now, below:
> *Overview* We are setting up a Kafka Streams application in which we have to
> perform windowed operations. We are grouping devices based on a specific key.
> Following are the sample columns we are using for GroupBy:
>
> ||Field Name ||Field Value||
> |A|12|
> |B|abc|
> |C|x13|
>
> Sample Key based on the above data: 12abcx13 where key = Field (A) + Field
> (B) + Field (C)
> *Problem* Getting a different count of records in two scenarios against the
> same key When specifying the key ourselves using groupBy() Using groupByKey()
> to group the data on the ‘Input Kafka Topic’ partitioning key.
> *Description* We were first using the groupBy() function of Kafka streams to
> group the devices using the key above. In this case, the streams application
> dropped several records and produced less number of records than expected.
> However, when we did not specify our own custom grouping using the groupBy()
> function, and instead used groupByKey() to key the data on the original
> incoming Kafka partition key, we got the exact number of records which were
> expected.
> To check that we were using the exact same keys as the input topic for our
> custom groupBy() function we compared both Keys within the code. The Input
> topic key and the custom key were exactly the same.
> So now we have come to the conclusion that there is some internal
> functionality of the groupBy function that we are not able to understand
> because of which the groupBy function and the groupByKey function both report
> different counts for the same key. We have searched multiple forums but are
> unable to understand the reason for this phenomenon.
> *Code Snippet:*
> With groupBykey()
>
> {code:java}
> KStream<String, Output> myStream = this.stream
> .groupByKey()
> .windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
> .reduce((value1, value2) -> value2)
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> .toStream() .transform(new myTransformer(this.store.name(),
> this.store.name());{code}
>
>
> With groupBy():
>
> {code:java}
> KStream<String, Output> myStream = this.stream
> .groupBy((key, value) -> value.A + value.B + value.C,
> Grouped.with(Serdes.String(), SerdesCollection.getInputSerdes()))
> .windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
> .reduce((value1, value2) -> value2)
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> .toStream() .transform(new myTransformer(this.store.name()),
> this.store.name());{code}
>
>
> ||*Kafka Cluster Setup*|| ||
> |Number of Nodes| 3|
> |CPU Cores| 2|
> |RAM| 8 Gb|
>
> ||*Streaming Application Setup*||Version||
> | {{Kafka Streams Version }}| {{2.3.0}}|
> | openjdk version| 11.0.8|
--
This message was sent by Atlassian Jira
(v8.3.4#803005)