感谢!那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可? 我看新的sink v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis() - context.timestamp()得到sink延迟呢?
在 2024-02-21 09:41:37,"Xuyang" <xyzhong...@163.com> 写道: >Hi, chen. >可以试一下在sink function的invoke函数中使用: > > > @Override > public void invoke(RowData row, Context context) throws Exception { > context.currentProcessingTime(); > context.currentWatermark(); > ... > } > > > > > > > >-- > > Best! > Xuyang > > > > > >在 2024-02-20 19:38:44,"Feng Jin" <jinfeng1...@gmail.com> 写道: >>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime. >> >>Best, >>Feng >> >>On Tue, Feb 20, 2024 at 4:35 PM casel.chen <casel_c...@126.com> wrote: >> >>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark? >>> >>> >>> public class XxxSinkFunction extends RichSinkFunction<RowData> implements >>> CheckpointedFunction, CheckpointListener { >>> >>> >>> @Override >>> public synchronized void invoke(RowData rowData, Context context) >>> throws IOException { >>> // 这里想从rowData中获取event time和watermark值,如何实现呢? >>> } >>> } >>> >>> >>> 例如source table如下定义 >>> >>> >>> CREATE TEMPORARY TABLE source_table( >>> username varchar, >>> click_url varchar, >>> eventtime varchar, >>> >>> ts AS TO_TIMESTAMP(eventtime), >>> WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。 >>> ) with ( >>> 'connector'='kafka', >>> ... >>> >>> ); >>> >>> >>> CREATE TEMPORARY TABLE sink_table( >>> username varchar, >>> click_url varchar, >>> eventtime varchar >>> ) with ( >>> 'connector'='xxx', >>> ... >>> ); >>> insert into sink_table select username,click_url,eventtime from >>> source_table;