Hi all,
I had a requirement to handle Kafka producer exceptions so that they don’t
bring down the job. I extended FlinkKafkaProducer010 and handled the exceptions
there.
public void invoke(T value, Context context) throws Exception {
try {
this.checkErroneous();
...
this.producer.send(record, this.callback);
} catch (Exception exception) {
// Handle exception
}
}
The problem with this is, because checkErroneous() is at the beginning of the
invoke() method, the catch block is getting triggered for the next message –
not for the message that is causing the exception. So, I moved checkErroneous()
below producer.send() like so –
public void invoke(T value, Context context) throws Exception {
try {
...
this.producer.send(record, this.callback);
this.checkErroneous();
} catch (Exception exception) {
// Handle exception
}
}
This solved the issue, the exceptions are now being thrown for the message
that’s causing the error instead of the next message.
Is there a specific reason why checkErroneous() is on top? Or am I doing
something wrong?
Class:
https://github.com/apache/flink/blob/19d20e5cf8d44d726b4a44575e6c8db677e4c3c8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
Regards,
Harshith