我按官网操作,重写了序列化方式

val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema,
props)kafkaSource.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor[MyType] {
    def extractAscendingTimestamp(element: MyType): Long =
element.eventTimestamp})
val stream: DataStream[MyType] = env.addSource(kafkaSource)

*有个疑问,这样写完之后是不是不用设置*setAutoWatermarkInterval 呢?

回复