上封邮件发错了,重新发一下。项目中使用精准一次语义写入kafka,代码和配置如下:
写入代码如下: Properties producerProperties = MyKafkaUtil.getProducerProperties(); KafkaSink<String> kafkaSink = KafkaSink.<String>builder() .setBootstrapServers(Event2Kafka.parameterTool.get("bootstrap.server")) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic(Event2Kafka.parameterTool.get("feature.topic.name")) .setValueSerializationSchema(new SimpleStringSchema()) .build()) .setKafkaProducerConfig(producerProperties) .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setTransactionalIdPrefix("streamx_flow_1261") .build(); eventJsonStream.sinkTo(kafkaSink).setParallelism(14) .name("event2kafka").uid("kafkasink"); kafka配置如下: public static Properties getProducerProperties(){ Properties kafkaProducerProps = new Properties(); kafkaProducerProps.setProperty("bootstrap.servers", parameterTool.get(bootstrap.server")); kafkaProducerProps.setProperty("auto.commit.interval.ms", "5000"); kafkaProducerProps.setProperty("auto.offset.reset", "latest"); kafkaProducerProps.setProperty("session.timeout.ms", "5000"); kafkaProducerProps.setProperty("transaction.timeout.ms",12*60000 +""); kafkaProducerProps.put("security.protocol", "SASL_PLAINTEXT"); kafkaProducerProps.put("sasl.kerberos.service.name","kafka"); return kafkaProducerProps; } 项目运行很久都没啥问题,最近突然报了以下的错误 org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka xxxx-topic-2@-1 with FlinkKafkaInternalProducer{transactionalId='streamx_flow_1261-8-5', inTransaction=true, closed=false} at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:436) at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:417) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number. 参考了stackoverflow上面的回答:https://stackoverflow.com/questions/55192852/transactional-producer-vs-just-idempotent-producer-java-exception-outoforderseq 但是里面涉及到的参数我都没有设置,都是使用默认的配置。照理来说应该不会有这样的问题。想请问下各位有没有什么看法。还是我的配置有啥错误和缺少的地方。