I keep on receiving this exception during the execution of a simple job that
receives time series data via Kafka, transforms it into avro format, and then
sends into a Kafka topic consumed by druid.
Any advise would be appreciated as to how to resolve this type of error.
I'm using Apache Kafka 2.6,2 via Amazon MSK and Flink 1.12.1.
private Properties buildKafkaProducerConfigProperties() {
final Properties properties = new Properties();
properties.setProperty("bootstrap.servers", String.join(",",
druidProducerKafkaBootstrapServers));
if (druidProducerZookeeperConnect != null) {
properties.setProperty("zookeeper.connect",
druidProducerZookeeperConnect);
}
if (druidProducerTransactionMaxTimeoutMs > 0) {
properties.setProperty("transaction.max.timeout.ms",
Integer.toString(druidProducerTransactionMaxTimeoutMs));
}
if (druidProducerTransactionTimeoutMs > 0) {
properties.setProperty("transaction.timeout.ms",
Integer.toString(druidProducerTransactionTimeoutMs));
}
properties.setProperty("transactional.id", "local.druid");
if (druidProducerKafkaRequiresSsl) {
properties.setProperty("ssl.truststore.location",
kafkaSslTruststoreLocation);
properties.setProperty("ssl.truststore.password",
kafkaSslTruststorePassword);
properties.setProperty("security.protocol", "SSL");
}
properties.setProperty("offsets.topic.replication.factor", "3");
properties.setProperty("transaction.state.log.replication.factor", "1");
return properties;
}
private void addToDruidProducerSink(DataStream<SensorMessage> sensorLineStream)
{
final Properties producerProperties = buildKafkaProducerConfigProperties();
FlinkKafkaProducer<SensorMessage> producerTopic = new FlinkKafkaProducer<>(
druidProducerKafkaTopic,
new
DruidSensorMessageKafkaSerializationSchema(druidProducerKafkaTopic),
producerProperties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
final DataStreamSink<SensorMessage> producerSink =
sensorLineStream.addSink(producerTopic);
// workaround for producer transcation id failures is to give producer a
unique name.
// https://issues.apache.org/jira/browse/FLINK-11654
producerSink
.name("druid sink " + druidProducerKafkaGroupId)
.uid("druid-sink-" + druidProducerKafkaGroupId);
}
2022-02-22 14:41:37
java.io.IOException: Could not perform checkpoint 1 for operator split -> Sink:
druid sink producer.local.druid (1/1)#0.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:968)
at
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:115)
at
org.apache.flink.streaming.runtime.io.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:156)
at
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:178)
at
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:179)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send
data to Kafka: The producer has been rejected from the broker because it tried
to use an old epoch with the transactionalId
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:965)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
... 3 more
Caused by: org.apache.kafka.common.errors.ProducerFencedException: The
producer has been rejected from the broker because it tried to use an old epoch
with the transactionalId
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not
complete snapshot 1 for operator split -> Sink: druid sink producer.local.druid
(1/1)#0. Failure reason: Checkpoint was declined.
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:951)
... 13 more
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
Failed to send data to Kafka: Producer attempted an operation with an old
epoch. Either there is a newer producer with the same transactionalId, or the
producer's transaction has been expired by the broker.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1095)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:1002)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:99)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:320)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:1100)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
... 23 more
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer with
the same transactionalId, or the producer's transaction has been expired by the
broker.