[ 
https://issues.apache.org/jira/browse/KAFKA-13290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Zhou updated KAFKA-13290:
-------------------------------
    Description: 
I have a Kafka stream event processing code which aggregates 1 minutes data. 

It works as expected if data comes continuously, 

If we stop producer, then i found the last aggregated message does not emit 
until new message coming.

 

Following is my sample code, @Bean
 public KStream<String, AggregateMetricsFields> kStream(StreamsBuilder 
streamBuilder) {

  KStream<String, AggregateMetricsFields> aggregatedData = streamBuilder
               .stream(dataTopic, dataConsumed)
               .groupByKey(Grouped.with(
                             stringSerde,
                             aggregateValueSerde))
               
.windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L)))
              .aggregate(this::initialize, this::aggregateFields,
                               materializedAsWindowStore(windowedStoreName, 
stringSerde,
                               AggregateMetricsFieldsSerde))
            
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
            .withName(windowedSuppressNodeName))
             .toStream().map((key, aggregateMetrics) ->

{            return KeyValue.pair(key.key(), aggregateMetrics);    }

);
 aggregatedData.to(aggregatedDataTopic, aggregateDataProduced);
 return aggregatedFlowData;
 }

 

  was:
I have a stream code which aggregate 1 minutes data. 

It works as expected if data comes continuously, 

If we stop producer, then i found the last aggregated message does not emit 
until new message coming, even the new message has different key.

 

Following is my sample code, @Bean
 public KStream<String, AggregateMetricsFields> kStream(StreamsBuilder 
streamBuilder) {

  KStream<String, AggregateMetricsFields> aggregatedData = streamBuilder
               .stream(dataTopic, dataConsumed)
               .groupByKey(Grouped.with(
                             stringSerde,
                             aggregateValueSerde))
               
.windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L)))
              .aggregate(this::initialize, this::aggregateFields,
                               materializedAsWindowStore(windowedStoreName, 
stringSerde,
                               AggregateMetricsFieldsSerde))
            
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
            .withName(windowedSuppressNodeName))
             .toStream().map((key, aggregateMetrics) ->

{            return KeyValue.pair(key.key(), aggregateMetrics);    }

);
 aggregatedData.to(aggregatedDataTopic, aggregateDataProduced);
 return aggregatedFlowData;
 }

 


> My timeWindows last aggregated message never emit until a new message coming 
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-13290
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13290
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.2
>         Environment: Development
>            Reporter: Steve Zhou
>            Priority: Major
>
> I have a Kafka stream event processing code which aggregates 1 minutes data. 
> It works as expected if data comes continuously, 
> If we stop producer, then i found the last aggregated message does not emit 
> until new message coming.
>  
> Following is my sample code, @Bean
>  public KStream<String, AggregateMetricsFields> kStream(StreamsBuilder 
> streamBuilder) {
>   KStream<String, AggregateMetricsFields> aggregatedData = streamBuilder
>                .stream(dataTopic, dataConsumed)
>                .groupByKey(Grouped.with(
>                              stringSerde,
>                              aggregateValueSerde))
>                
> .windowedBy(TimeWindows.of(windowDuration).grace(Duration.ofMillis(10L)))
>               .aggregate(this::initialize, this::aggregateFields,
>                                materializedAsWindowStore(windowedStoreName, 
> stringSerde,
>                                AggregateMetricsFieldsSerde))
>             
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
>             .withName(windowedSuppressNodeName))
>              .toStream().map((key, aggregateMetrics) ->
> {            return KeyValue.pair(key.key(), aggregateMetrics);    }
> );
>  aggregatedData.to(aggregatedDataTopic, aggregateDataProduced);
>  return aggregatedFlowData;
>  }
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to