*一、场景说明:*
flink作业逻辑:source(kakfa)-> data process (wordCount逻辑) -> sink (kafka)
1、作业A:
source_topic: word_count_topic
sink_topc: result_01
group_id: test-group01
2、作业B:
source_topic: word_count_topic
sink_topc: result_02
group_id: test-group02
3、两个作业使用的是同一个jar包,同一段代码,唯独group.id 和 sink_topic不同。
4、FlinkKafkaProducer 使用 EXACTLY_ONCE 语义,使用kafak事务向topic写入数据。
*现象:*
从以下错误日志可以看出,transactionid相互干扰。
*jobmanager端错误日志:*
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka: Producer attempted an operation with an old epoch.
Either there is a newer producer with the same transactionalId, or the
producer's transaction has been expired by the broker.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1227)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:741)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:90)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:231)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:64)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer
with the same transactionalId, or the producer's transaction has been
expired by the broker.
*takmanager端错误日志:*
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted
an operation with an old epoch. Either there is a newer producer with the
same transactionalId, or the producer's transaction has been expired by the
broker.
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka: Producer attempted an operation with an old epoch.
Either there is a newer producer with the same transactionalId, or the
producer's transaction has been expired by the broker.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1227)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:837)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:605)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:504)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer
with the same transactionalId, or the producer's transaction has been
expired by the broker.
2020-06-04 16:48:36,817 INFO org.apache.flink.runtime.taskmanager.Task
- pend-name -> (Sink: pnt-name, Sink: sink-name) (1/1)
(db4ee0c44888e866b3d26d39b34a0bd8) switched from RUNNING to FAILED.
*二、初步分析*
*1、现象说明*
尽管是两个独立的flink作业,同时向同一个kafka集群的不同topic写入事务性消息,但是使用的transactionId竟然一样;
所以会有不同作业使用相同的transactionid的问题。
*2、transactionid生成策略*
transactionid = flink ui显示的算子name + “-” + 32位字符串(与算子uid 1:1映射) + “-” +
编号
基于这个公式,如果name和uid相同,得到的transactionid也一定相同。
*相关代码:*
transactionalIdsGenerator = new TransactionalIdsGenerator(
getRuntimeContext().getTaskName() + "-" +
((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
kafkaProducersPoolSize,
SAFE_SCALE_DOWN_FACTOR);
*3、transactionid相同*
在以下场景中,可能会使用到相同的transactionId
1、(如最开始场景说明所示) 由同一个jar包而来的多个运行实例,只有topic和group.id不同。
2、不同作业代码结构类似,且都没有显示指定FlinkKafkaProcuder sink算子的name和uid。
*三、如何避免相同的transactionid*
1、每次都给算子指定不会与别人冲突的name 和 uid值
或
2、覆盖 TransactionalIdsGenerator 逻辑,可以用uuid值来替换uid对应的32位值。
* 基于一下两点考虑,觉得使用UUID的还是可以的*
如果是新提交的flink作业,会使用uuid生成transactionIds并保存在userContext中,并最终保存在state里;
如果是从state恢复的作业,按照flink现有逻辑是不会重新生成transactionid,而是从state内部拿到userContext,从userContext中拿到transactionids,之后会一直循环使用这些transactionid。
*四、问题*
* 大家在使用过程中遇到过这个问题吗?大家是如何解决的?*
*谢谢大家!*