After following suggestion from SO I added few changes, so now I'm using Event Time Water marks are progressing, I've checked them in Flink's metrics. The Window operator is triggered but still I don't see any late outputs for this.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000)); env.setParallelism(1); env.disableOperatorChaining(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(1000); DataStream<RawMessage> rawBusinessTransaction = env .addSource(new FlinkKafkaConsumer<>("business", new JSONKeyValueDeserializationSchema(false), properties)) .map(new KafkaTransactionObjectMapOperator()) .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<RawMessage>() { @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(System.currentTimeMillis()); } @Override public long extractTimestamp(RawMessage element, long previousElementTimestamp) { return element.messageCreationTime; } }) .name("Kafka Transaction Raw Data Source."); messageStream .keyBy(tradeKeySelector) .window(EventTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor())) .sideOutputLateData(lateTradeMessages) .process(new CumulativeTransactionOperator()) .name("Aggregate Transaction Builder"); -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/