我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.

Best,
Feng

On Tue, Feb 20, 2024 at 4:35 PM casel.chen <casel_c...@126.com> wrote:

> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>
>
> public class XxxSinkFunction extends RichSinkFunction<RowData> implements
> CheckpointedFunction, CheckpointListener {
>
>
>     @Override
>     public synchronized void invoke(RowData rowData, Context context)
> throws IOException {
>            //  这里想从rowData中获取event time和watermark值,如何实现呢?
>     }
> }
>
>
> 例如source table如下定义
>
>
>     CREATE TEMPORARY TABLE source_table(
>   username varchar,
>   click_url varchar,
>   eventtime varchar,
>
>   ts AS TO_TIMESTAMP(eventtime),
>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
> ) with (
>   'connector'='kafka',
>   ...
>
> );
>
>
> CREATE TEMPORARY TABLE sink_table(
>   username varchar,
>   click_url varchar,
>   eventtime varchar
> ) with (
>   'connector'='xxx',
>   ...
> );
> insert into sink_table select username,click_url,eventtime from
> source_table;

回复