hi, 我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id. at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at java.lang.Thread.run(Thread.java:748) Suppressed: java.lang.NullPointerException