上封邮件发错了,重新发一下。项目中使用精准一次语义写入kafka,代码和配置如下:

写入代码如下:
Properties producerProperties = MyKafkaUtil.getProducerProperties();
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers(Event2Kafka.parameterTool.get("bootstrap.server"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(Event2Kafka.parameterTool.get("feature.topic.name"))
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setKafkaProducerConfig(producerProperties)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("streamx_flow_1261")
.build();

eventJsonStream.sinkTo(kafkaSink).setParallelism(14)

.name("event2kafka").uid("kafkasink");




kafka配置如下:
public static Properties getProducerProperties(){
Properties kafkaProducerProps = new Properties();
kafkaProducerProps.setProperty("bootstrap.servers", 
parameterTool.get(bootstrap.server"));
kafkaProducerProps.setProperty("auto.commit.interval.ms", "5000");
kafkaProducerProps.setProperty("auto.offset.reset", "latest");
kafkaProducerProps.setProperty("session.timeout.ms", "5000");
kafkaProducerProps.setProperty("transaction.timeout.ms",12*60000 +"");
kafkaProducerProps.put("security.protocol", "SASL_PLAINTEXT");
kafkaProducerProps.put("sasl.kerberos.service.name","kafka");

        return kafkaProducerProps;

}


项目运行很久都没啥问题,最近突然报了以下的错误
org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka 
xxxx-topic-2@-1 with 
FlinkKafkaInternalProducer{transactionalId='streamx_flow_1261-8-5', 
inTransaction=true, closed=false} 
        at 
org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:436)
        at 
org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:417)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: The 
broker received an out of order sequence number.


参考了stackoverflow上面的回答:https://stackoverflow.com/questions/55192852/transactional-producer-vs-just-idempotent-producer-java-exception-outoforderseq
但是里面涉及到的参数我都没有设置,都是使用默认的配置。照理来说应该不会有这样的问题。想请问下各位有没有什么看法。还是我的配置有啥错误和缺少的地方。

回复