To set rowtime watermarks delay of source you can:
val desc = Schema()
  .field("a", Types.INT)
  .field("e", Types.LONG)
  .field("f", Types.STRING)
  .field("t", Types.SQL_TIMESTAMP)      
.rowtime(Rowtime().timestampsFromField("t").watermarksPeriodicBounded(1000))
Use watermarksPeriodicBounded api to set delay.
And then XXTableFactory.createStreamTableSource(desc.toProperties)


------------------------------------------------------------------
From:JingsongLee <lzljs3620...@aliyun.com>
Send Time:2019年4月16日(星期二) 17:09
To:Lasse Nedergaard <lassenederga...@gmail.com>; user <user@flink.apache.org>
Subject:回复:Is it possible to handle late data when using table API?

Hi @Lasse Nedergaard, Table API don't have allowedLateness api.
But you can set rowtime.watermarks.delay of source to slow down the watermark 
clock.


------------------------------------------------------------------
发件人:Lasse Nedergaard <lassenederga...@gmail.com>
发送时间:2019年4月16日(星期二) 16:20
收件人:user <user@flink.apache.org>
主 题:Is it possible to handle late data when using table API?

Hi.

I have a simple tumble window working on eventtime like this.

Table res30MinWindows = machineInsights
        .window(Tumble.over("30.minutes").on("UserActionTime").as("w")) // 
define window
        .groupBy("machineId, machineInsightId, w") // group by key and window
        .select("machineId, machineInsightId, w.start, w.end, w.rowtime, 
value.max as max"); // access window properties and aggregate
As we work with Iot units we don't have 100% control over the eventtime 
reported and therefore need to handle late data to ensure that we don't do our 
calculation wrong.
I would like to know if there is any option in the Table API to get access to 
late data, or my only option is to use Streaming API?
Thanks in advance
Lasse Nedergaard


Reply via email to