[ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reopened KAFKA-7895:
---------------------------------

> Ktable supress operator emitting more than one record for the same key per 
> window
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-7895
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7895
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.1.0, 2.1.1
>            Reporter: prasanthi
>            Assignee: John Roesler
>            Priority: Major
>             Fix For: 2.2.0, 2.1.2
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable<Windowed<Integer>, Long> windowedCount = groupedStream
>      .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>      .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>      .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1039
> [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1162
> [KTABLE-TOSTREAM-0000000010]: [9@1549067040000/1549067100000], 6584
> [KTABLE-TOSTREAM-0000000010]: [88@1549067040000/1549067100000], 107
> [KTABLE-TOSTREAM-0000000010]: [108@1549067040000/1549067100000], 315
> [KTABLE-TOSTREAM-0000000010]: [119@1549067040000/1549067100000], 119
> [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 746
> [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use 
> system time without adding any extra time for verification
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to