[ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16801566#comment-16801566
 ] 

Andrew Klopper commented on KAFKA-7895:
---------------------------------------

Hi John Roesler

To answer your first question, I have a shutdown hook in place that seems to be 
working correctly according to the slf4j logs (the application changes state 
from RUNNING -> PENDING_SHUTDOWN -> NOT_RUNNING).
{code:java}
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
{code}
I do have exactly-once configured. My settings are as follows (commit interval 
of 10s):
{code:java}
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
appConfig.getApplicationID());
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
            
appConfig.getBootstrapServerList().stream().map(HostPort::toString).collect(Collectors.joining(",")));
        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
appConfig.getStateDirectory());
        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
            appConfig.getCommitIntervalDuration().toMillis());
        
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
            
com.booking.infra.rollup.kafka.TimestampedValueTimestampExtractor.class);
        streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
            StreamsConfig.EXACTLY_ONCE);
{code}
When I test I start with a new docker-based Kafka cluster and delete the 
application state directory. I feed the app with enough input to generate one 
or two commits and then hit Ctrl-C, which appears to shut everything down 
correctly. I then restart the app leaving the state directory untouched. My 
code does not call streams.cleanup(). Almost without fail this will produce 
repeated entries for the same key/window. Sometimes the repeated entries will 
be different, in which case the repeat matches an older entry in the source 
KTable.

I do not have to stress the app in any way to cause this, which makes me wonder 
if I am doing something fundamentally wrong.

Here is an example of the repeated output:
{noformat}
0.0026: 1553591980510: monitors.group1:stat.stat1: time=1553591980510 
wstart=1553591980000 wsize=1000 count=12 sum=1044 sumSq=91400 min=76 max=98 
last=98
0.0041: 1553591980057: monitors.group1:stat.stat1: time=1553591980057 
wstart=1553591980000 wsize=1000 count=2 sum=154 sumSq=11860 min=76 max=78 
last=78
{noformat}
And here is are the corresponding entries in the KTable changelog:
{noformat}
0.0026: 1553591980057: 
[monitors.group1:stat.stat1@1553591980000/1553591981000]: time=1553591980057 
wstart=1553591980000 wsize=1000 count=2 sum=154 sumSq=11860 min=76 max=78 
last=78
0.0028: 1553591987079: 
[monitors.group1:stat.stat1@1553591980000/1553591981000]: time=1553591980510 
wstart=1553591980000 wsize=1000 count=12 sum=1044 sumSq=91400 min=76 max=98 
last=98
{noformat}
Please let me know if you need any more information, or if you need me to run 
any more tests.

Regards
 Andrew

> Ktable supress operator emitting more than one record for the same key per 
> window
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-7895
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7895
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.1.0, 2.1.1
>            Reporter: prasanthi
>            Assignee: John Roesler
>            Priority: Major
>             Fix For: 2.2.0, 2.1.2
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable<Windowed<Integer>, Long> windowedCount = groupedStream
>      .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>      .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>      .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1039
> [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1162
> [KTABLE-TOSTREAM-0000000010]: [9@1549067040000/1549067100000], 6584
> [KTABLE-TOSTREAM-0000000010]: [88@1549067040000/1549067100000], 107
> [KTABLE-TOSTREAM-0000000010]: [108@1549067040000/1549067100000], 315
> [KTABLE-TOSTREAM-0000000010]: [119@1549067040000/1549067100000], 119
> [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 746
> [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use 
> system time without adding any extra time for verification
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to