Hi marble,
 使用 Datastream 开发的话,Kafka connector 
的使用可参考文献1[1];EventTime以及WaterMark的使用可以参考文献2[2]。
对应的中文文档对应在文献3和4.
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html
[3] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html
[4] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/event_timestamps_watermarks.html


Best,
Hailong Wang

在 2020-11-03 09:12:42,"[email protected]" 
<[email protected]> 写道:
>你好。
>在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample,
>
>我是这样用的,
>1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>2. 在connector里指定watermark,其中transTime是消息里的字段
>"    rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, 'yyyy-MM-dd
>HH:mm:ss')), \n " +
>"    WATERMARK FOR rowtime AS rowtime - INTERVAL '10' SECOND \n" +
>
>3. 然后直接用datastream的window
>ds.keyBy(marketCode).timeWindow(Time.minutes(1L)); 
>
>但在运行时会报以下exception, 已经 在connector里定义了,还需要assign吗?
>java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
>timestamp marker). Is the time characteristic set to 'ProcessingTime', or
>did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
>
>即使我在datastream里定义了strategy ,
>ds.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)));
>
>也还是报上面一样的错。
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/

回复