Hi

We’ve got a job producing to a Kafka sink. The Kafka topics have a retention of 
2 weeks. When doing a complete replay, it seems like Flink isn’t able to 
back-pressure or throttle the amount of messages going to Kafka, causing the 
following error:

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: Expiring 8396 record(s) for topic-1:120000 ms has passed since 
batch creation

We’re running on Flink 1.7.2 with flink-connector-kafka:1.7.2. Our Kafka 
cluster is running version 2.1.1. The Kafka producer uses all default settings 
except from:

compression.type = snappy
max.in.flight.requests.per.connection = 1
acks = all
client.dns.lookup = use_all_dns_ips

I tried playing around with the buffer and batch settings, increasing timeouts, 
but none seem to be what we need. Increasing the delivery.timeout.ms and 
request.timeout.ms solves the initial error, but causes the Flink job to fail 
entirely due to:

Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.

My assumption is that the Kafka producer will start blocking since it notices 
that it can't handle the batches, and Flink eventually runs out of buffers for 
the operator.

What really baffles me is that the backpressure tab shows that everything is 
OK. The entire job pipeline (which reads from 4 different topics, unions them 
all and sinks towards 1 topic) pushes all the messages through to the sink 
stage, resulting in 18 million incoming stage messages, even though Kafka is in 
no way possible to keep up with this.

I searched for others facing the same issue but can't find anything similar. 
I'm hoping that someone here could guide me in the right direction.

Thanks in advance

Reply via email to