[ https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16801566#comment-16801566 ]
Andrew Klopper commented on KAFKA-7895: --------------------------------------- Hi John Roesler To answer your first question, I have a shutdown hook in place that seems to be working correctly according to the slf4j logs (the application changes state from RUNNING -> PENDING_SHUTDOWN -> NOT_RUNNING). {code:java} Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); {code} I do have exactly-once configured. My settings are as follows (commit interval of 10s): {code:java} streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appConfig.getApplicationID()); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.getBootstrapServerList().stream().map(HostPort::toString).collect(Collectors.joining(","))); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, appConfig.getStateDirectory()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, appConfig.getCommitIntervalDuration().toMillis()); streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, com.booking.infra.rollup.kafka.TimestampedValueTimestampExtractor.class); streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); {code} When I test I start with a new docker-based Kafka cluster and delete the application state directory. I feed the app with enough input to generate one or two commits and then hit Ctrl-C, which appears to shut everything down correctly. I then restart the app leaving the state directory untouched. My code does not call streams.cleanup(). Almost without fail this will produce repeated entries for the same key/window. Sometimes the repeated entries will be different, in which case the repeat matches an older entry in the source KTable. I do not have to stress the app in any way to cause this, which makes me wonder if I am doing something fundamentally wrong. Here is an example of the repeated output: {noformat} 0.0026: 1553591980510: monitors.group1:stat.stat1: time=1553591980510 wstart=1553591980000 wsize=1000 count=12 sum=1044 sumSq=91400 min=76 max=98 last=98 0.0041: 1553591980057: monitors.group1:stat.stat1: time=1553591980057 wstart=1553591980000 wsize=1000 count=2 sum=154 sumSq=11860 min=76 max=78 last=78 {noformat} And here is are the corresponding entries in the KTable changelog: {noformat} 0.0026: 1553591980057: [monitors.group1:stat.stat1@1553591980000/1553591981000]: time=1553591980057 wstart=1553591980000 wsize=1000 count=2 sum=154 sumSq=11860 min=76 max=78 last=78 0.0028: 1553591987079: [monitors.group1:stat.stat1@1553591980000/1553591981000]: time=1553591980510 wstart=1553591980000 wsize=1000 count=12 sum=1044 sumSq=91400 min=76 max=98 last=98 {noformat} Please let me know if you need any more information, or if you need me to run any more tests. Regards Andrew > Ktable supress operator emitting more than one record for the same key per > window > --------------------------------------------------------------------------------- > > Key: KAFKA-7895 > URL: https://issues.apache.org/jira/browse/KAFKA-7895 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.1.0, 2.1.1 > Reporter: prasanthi > Assignee: John Roesler > Priority: Major > Fix For: 2.2.0, 2.1.2 > > > Hi, We are using kstreams to get the aggregated counts per vendor(key) within > a specified window. > Here's how we configured the suppress operator to emit one final record per > key/window. > {code:java} > KTable<Windowed<Integer>, Long> windowedCount = groupedStream > .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L))) > .count(Materialized.with(Serdes.Integer(),Serdes.Long())) > .suppress(Suppressed.untilWindowCloses(unbounded())); > {code} > But we are getting more than one record for the same key/window as shown > below. > {code:java} > [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1039 > [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1162 > [KTABLE-TOSTREAM-0000000010]: [9@1549067040000/1549067100000], 6584 > [KTABLE-TOSTREAM-0000000010]: [88@1549067040000/1549067100000], 107 > [KTABLE-TOSTREAM-0000000010]: [108@1549067040000/1549067100000], 315 > [KTABLE-TOSTREAM-0000000010]: [119@1549067040000/1549067100000], 119 > [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 746 > [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 809{code} > Could you please take a look? > Thanks > > > Added by John: > Acceptance Criteria: > * add suppress to system tests, such that it's exercised with crash/shutdown > recovery, rebalance, etc. > ** [https://github.com/apache/kafka/pull/6278] > * make sure that there's some system test coverage with caching disabled. > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943 > * test with tighter time bounds with windows of say 30 seconds and use > system time without adding any extra time for verification > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944 -- This message was sent by Atlassian JIRA (v7.6.3#76005)