[
https://issues.apache.org/jira/browse/KAFKA-13290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Steve Zhou updated KAFKA-13290:
-------------------------------
Description:
I have a Kafka stream event processing code which aggregates 1 minutes data.
It works as expected if data comes continuously,
If we stop producer, then i found the last aggregated message does not emit
until new message coming.
Following is my sample code, @Bean
public KStream<String, AggregateMetricsFields> kStream(StreamsBuilder
streamBuilder) {
KStream<String, AggregateMetricsFields> aggregatedData = streamBuilder
.stream(dataTopic, dataConsumed)
.groupByKey(Grouped.with(
stringSerde,
aggregateValueSerde))
.windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L)))
.aggregate(this::initialize, this::aggregateFields,
materializedAsWindowStore(windowedStoreName,
stringSerde,
AggregateMetricsFieldsSerde))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
.withName(windowedSuppressNodeName))
.toStream().map((key, aggregateMetrics) ->
{ return KeyValue.pair(key.key(), aggregateMetrics); }
);
aggregatedData.to(aggregatedDataTopic, aggregateDataProduced);
return aggregatedFlowData;
}
was:
I have a stream code which aggregate 1 minutes data.
It works as expected if data comes continuously,
If we stop producer, then i found the last aggregated message does not emit
until new message coming, even the new message has different key.
Following is my sample code, @Bean
public KStream<String, AggregateMetricsFields> kStream(StreamsBuilder
streamBuilder) {
KStream<String, AggregateMetricsFields> aggregatedData = streamBuilder
.stream(dataTopic, dataConsumed)
.groupByKey(Grouped.with(
stringSerde,
aggregateValueSerde))
.windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L)))
.aggregate(this::initialize, this::aggregateFields,
materializedAsWindowStore(windowedStoreName,
stringSerde,
AggregateMetricsFieldsSerde))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
.withName(windowedSuppressNodeName))
.toStream().map((key, aggregateMetrics) ->
{ return KeyValue.pair(key.key(), aggregateMetrics); }
);
aggregatedData.to(aggregatedDataTopic, aggregateDataProduced);
return aggregatedFlowData;
}
> My timeWindows last aggregated message never emit until a new message coming
> -----------------------------------------------------------------------------
>
> Key: KAFKA-13290
> URL: https://issues.apache.org/jira/browse/KAFKA-13290
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.6.2
> Environment: Development
> Reporter: Steve Zhou
> Priority: Major
>
> I have a Kafka stream event processing code which aggregates 1 minutes data.
> It works as expected if data comes continuously,
> If we stop producer, then i found the last aggregated message does not emit
> until new message coming.
>
> Following is my sample code, @Bean
> public KStream<String, AggregateMetricsFields> kStream(StreamsBuilder
> streamBuilder) {
> KStream<String, AggregateMetricsFields> aggregatedData = streamBuilder
> .stream(dataTopic, dataConsumed)
> .groupByKey(Grouped.with(
> stringSerde,
> aggregateValueSerde))
>
> .windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L)))
> .aggregate(this::initialize, this::aggregateFields,
> materializedAsWindowStore(windowedStoreName,
> stringSerde,
> AggregateMetricsFieldsSerde))
>
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
> .withName(windowedSuppressNodeName))
> .toStream().map((key, aggregateMetrics) ->
> { return KeyValue.pair(key.key(), aggregateMetrics); }
> );
> aggregatedData.to(aggregatedDataTopic, aggregateDataProduced);
> return aggregatedFlowData;
> }
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)