查了些资料,好像说是因为FlinkKafkaProducer.setWriteTimestampToKafka(true);导致的,我使用的是flink1.12.1,
相关代码片段如下,请教是什么原因导致的呢?

//sink
        Properties producerPro = new Properties();
        producerPro.setProperty("bootstrap.servers",kafkaAddr);
        producerPro.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
"600000");
       
        FlinkKafkaProducer flinkKafkaProducer = new
FlinkKafkaProducer<String>(dwdOsqueryDetailTopic, new SimpleStringSchema(),
producerPro, null, FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5);

        flinkKafkaProducer.setWriteTimestampToKafka(true);
        beanStr.addSink(flinkKafkaProducer);



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复