hi,
我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。

FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, 
new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);


org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id.
 at 
org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
 at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
 at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
 at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
 at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
 at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
 at java.lang.Thread.run(Thread.java:748)
 Suppressed: java.lang.NullPointerException

回复