Hi Kristoff,

please check my SO comment and reply.

https://stackoverflow.com/questions/59570445/late-outputs-missing-for-flinks-session-window/59642942#59642942

It's not entirely clear to me why it's not working but I also don't quite
understand your use case yet (data examples missing).

Best,

Arvid

On Fri, Jan 3, 2020 at 1:03 PM KristoffSC <krzysiek.chmielew...@gmail.com>
wrote:

> After following suggestion from SO
> I added few changes, so now I'm using Event Time
> Water marks are progressing, I've checked them in Flink's metrics. The
> Window operator is triggered but still I don't see any late outputs for
> this.
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000,
> 1000));
>         env.setParallelism(1);
>         env.disableOperatorChaining();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         env.getConfig().setAutoWatermarkInterval(1000);
>
>
> DataStream<RawMessage> rawBusinessTransaction = env
>                 .addSource(new FlinkKafkaConsumer<>("business",
>                         new JSONKeyValueDeserializationSchema(false),
> properties))
>                 .map(new KafkaTransactionObjectMapOperator())
>                 .assignTimestampsAndWatermarks(new
> AssignerWithPeriodicWatermarks<RawMessage>() {
>
>                     @Nullable
>                     @Override
>                     public Watermark getCurrentWatermark() {
>                         return new Watermark(System.currentTimeMillis());
>                     }
>
>                     @Override
>                     public long extractTimestamp(RawMessage element, long
> previousElementTimestamp) {
>                         return element.messageCreationTime;
>                     }
>                 })
>                 .name("Kafka Transaction Raw Data Source.");
>
> messageStream
>              .keyBy(tradeKeySelector)
>              .window(EventTimeSessionWindows.withDynamicGap(new
> TradeAggregationGapExtractor()))
>              .sideOutputLateData(lateTradeMessages)
>              .process(new CumulativeTransactionOperator())
>              .name("Aggregate Transaction Builder");
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply via email to