Steve Zhou created KAFKA-13290:
----------------------------------
Summary: My timeWindows last aggregated message never emit until a
new message coming
Key: KAFKA-13290
URL: https://issues.apache.org/jira/browse/KAFKA-13290
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.6.2
Environment: Development
Reporter: Steve Zhou
I have a stream code which aggregate 1 minutes data.
It works as expected if data comes continuously,
If we stop producer, then i found the last aggregated message does not emit
until new message coming, even the new message has different key.
Following is my sample code, @Bean
public KStream<String, AggregateMetricsFields> kStream(StreamsBuilder
streamBuilder) {
KStream<String, AggregateMetricsFields> aggregatedData = streamBuilder
.stream(dataTopic, dataConsumed)
.groupByKey(Grouped.with(
stringSerde,
aggregateValueSerde))
.windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L)))
.aggregate(this::initialize, this::aggregateFields,
materializedAsWindowStore(windowedStoreName,
stringSerde,
AggregateMetricsFieldsSerde))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
.withName(windowedFlowSuppressNodeName))
.toStream().map((key, aggregateMetrics) -> {
return KeyValue.pair(key.key(), aggregateMetrics);
});
aggregatedData.to(aggregatedDataTopic, aggregateDataProduced);
return aggregatedFlowData;
}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)