flink sql作业如何统计端到端延迟
flink sql作业从kafka消费mysql过来的canal json消息,经过复杂处理后写入doris,请问如何统计doris表记录的端到端时延?mysql表有update_time字段代表业务更新记录时间。 doris系统可以在表schema新增一个更新时间列ingest_time,所以在doris表上可以通过ingest_time - update_time算出端到端时延,但这种方法只能离线统计,有没有实时统计以方便实时监控的方法? 查了SinkFunction类的invoke方法虽然带有Context类型参数可以获取当前处理时间和事件时间,但因为大部分sink都是采用攒微批方式再批量写入的,所以这两个时间直接相减得到的时间差并不能代表真实落库的时延。有没有精确获取时延的方法呢?
Re:Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
感谢!那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可? 我看新的sink v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis() - context.timestamp()得到sink延迟呢? 在 2024-02-21 09:41:37,"Xuyang" 写道: >Hi, chen. >可以试一下在sink function的invoke函数中使用: > > >@Override >public void invoke(RowData row, Context context) throws Exception { >context.currentProcessingTime(); >context.currentWatermark(); >... >} > > > > > > > >-- > >Best! >Xuyang > > > > > >在 2024-02-20 19:38:44,"Feng Jin" 写道: >>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime. >> >>Best, >>Feng >> >>On Tue, Feb 20, 2024 at 4:35 PM casel.chen wrote: >> >>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark? >>> >>> >>> public class XxxSinkFunction extends RichSinkFunction implements >>> CheckpointedFunction, CheckpointListener { >>> >>> >>> @Override >>> public synchronized void invoke(RowData rowData, Context context) >>> throws IOException { >>>// 这里想从rowData中获取event time和watermark值,如何实现呢? >>> } >>> } >>> >>> >>> 例如source table如下定义 >>> >>> >>> CREATE TEMPORARY TABLE source_table( >>> username varchar, >>> click_url varchar, >>> eventtime varchar, >>> >>> ts AS TO_TIMESTAMP(eventtime), >>> WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。 >>> ) with ( >>> 'connector'='kafka', >>> ... >>> >>> ); >>> >>> >>> CREATE TEMPORARY TABLE sink_table( >>> username varchar, >>> click_url varchar, >>> eventtime varchar >>> ) with ( >>> 'connector'='xxx', >>> ... >>> ); >>> insert into sink_table select username,click_url,eventtime from >>> source_table;
Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
Hi, chen. 可以试一下在sink function的invoke函数中使用: @Override public void invoke(RowData row, Context context) throws Exception { context.currentProcessingTime(); context.currentWatermark(); ... } -- Best! Xuyang 在 2024-02-20 19:38:44,"Feng Jin" 写道: >我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime. > >Best, >Feng > >On Tue, Feb 20, 2024 at 4:35 PM casel.chen wrote: > >> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark? >> >> >> public class XxxSinkFunction extends RichSinkFunction implements >> CheckpointedFunction, CheckpointListener { >> >> >> @Override >> public synchronized void invoke(RowData rowData, Context context) >> throws IOException { >>// 这里想从rowData中获取event time和watermark值,如何实现呢? >> } >> } >> >> >> 例如source table如下定义 >> >> >> CREATE TEMPORARY TABLE source_table( >> username varchar, >> click_url varchar, >> eventtime varchar, >> >> ts AS TO_TIMESTAMP(eventtime), >> WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。 >> ) with ( >> 'connector'='kafka', >> ... >> >> ); >> >> >> CREATE TEMPORARY TABLE sink_table( >> username varchar, >> click_url varchar, >> eventtime varchar >> ) with ( >> 'connector'='xxx', >> ... >> ); >> insert into sink_table select username,click_url,eventtime from >> source_table;
Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime. Best, Feng On Tue, Feb 20, 2024 at 4:35 PM casel.chen wrote: > 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark? > > > public class XxxSinkFunction extends RichSinkFunction implements > CheckpointedFunction, CheckpointListener { > > > @Override > public synchronized void invoke(RowData rowData, Context context) > throws IOException { >// 这里想从rowData中获取event time和watermark值,如何实现呢? > } > } > > > 例如source table如下定义 > > > CREATE TEMPORARY TABLE source_table( > username varchar, > click_url varchar, > eventtime varchar, > > ts AS TO_TIMESTAMP(eventtime), > WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。 > ) with ( > 'connector'='kafka', > ... > > ); > > > CREATE TEMPORARY TABLE sink_table( > username varchar, > click_url varchar, > eventtime varchar > ) with ( > 'connector'='xxx', > ... > ); > insert into sink_table select username,click_url,eventtime from > source_table;
退订
退订
flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark? public class XxxSinkFunction extends RichSinkFunction implements CheckpointedFunction, CheckpointListener { @Override public synchronized void invoke(RowData rowData, Context context) throws IOException { // 这里想从rowData中获取event time和watermark值,如何实现呢? } } 例如source table如下定义 CREATE TEMPORARY TABLE source_table( username varchar, click_url varchar, eventtime varchar, ts AS TO_TIMESTAMP(eventtime), WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。 ) with ( 'connector'='kafka', ... ); CREATE TEMPORARY TABLE sink_table( username varchar, click_url varchar, eventtime varchar ) with ( 'connector'='xxx', ... ); insert into sink_table select username,click_url,eventtime from source_table;