After following suggestion from SO
I added few changes, so now I'm using Event Time
Water marks are progressing, I've checked them in Flink's metrics. The
Window operator is triggered but still I don't see any late outputs for
this. 


StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000,
1000));
        env.setParallelism(1);
        env.disableOperatorChaining();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);


DataStream<RawMessage> rawBusinessTransaction = env
                .addSource(new FlinkKafkaConsumer<>("business",
                        new JSONKeyValueDeserializationSchema(false),
properties))
                .map(new KafkaTransactionObjectMapOperator())
                .assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks<RawMessage>() {

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(System.currentTimeMillis());
                    }

                    @Override
                    public long extractTimestamp(RawMessage element, long
previousElementTimestamp) {
                        return element.messageCreationTime;
                    }
                })
                .name("Kafka Transaction Raw Data Source.");

messageStream
             .keyBy(tradeKeySelector)
             .window(EventTimeSessionWindows.withDynamicGap(new
TradeAggregationGapExtractor()))
             .sideOutputLateData(lateTradeMessages)
             .process(new CumulativeTransactionOperator())
             .name("Aggregate Transaction Builder");






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to