Hi Kristoff, please check my SO comment and reply.
https://stackoverflow.com/questions/59570445/late-outputs-missing-for-flinks-session-window/59642942#59642942 It's not entirely clear to me why it's not working but I also don't quite understand your use case yet (data examples missing). Best, Arvid On Fri, Jan 3, 2020 at 1:03 PM KristoffSC <krzysiek.chmielew...@gmail.com> wrote: > After following suggestion from SO > I added few changes, so now I'm using Event Time > Water marks are progressing, I've checked them in Flink's metrics. The > Window operator is triggered but still I don't see any late outputs for > this. > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, > 1000)); > env.setParallelism(1); > env.disableOperatorChaining(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > env.getConfig().setAutoWatermarkInterval(1000); > > > DataStream<RawMessage> rawBusinessTransaction = env > .addSource(new FlinkKafkaConsumer<>("business", > new JSONKeyValueDeserializationSchema(false), > properties)) > .map(new KafkaTransactionObjectMapOperator()) > .assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks<RawMessage>() { > > @Nullable > @Override > public Watermark getCurrentWatermark() { > return new Watermark(System.currentTimeMillis()); > } > > @Override > public long extractTimestamp(RawMessage element, long > previousElementTimestamp) { > return element.messageCreationTime; > } > }) > .name("Kafka Transaction Raw Data Source."); > > messageStream > .keyBy(tradeKeySelector) > .window(EventTimeSessionWindows.withDynamicGap(new > TradeAggregationGapExtractor())) > .sideOutputLateData(lateTradeMessages) > .process(new CumulativeTransactionOperator()) > .name("Aggregate Transaction Builder"); > > > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >