查了些资料,好像说是因为FlinkKafkaProducer.setWriteTimestampToKafka(true);导致的,我使用的是flink1.12.1, 相关代码片段如下,请教是什么原因导致的呢?
//sink Properties producerPro = new Properties(); producerPro.setProperty("bootstrap.servers",kafkaAddr); producerPro.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "600000"); FlinkKafkaProducer flinkKafkaProducer = new FlinkKafkaProducer<String>(dwdOsqueryDetailTopic, new SimpleStringSchema(), producerPro, null, FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5); flinkKafkaProducer.setWriteTimestampToKafka(true); beanStr.addSink(flinkKafkaProducer); -- Sent from: http://apache-flink.147419.n8.nabble.com/