Hello,
Since the last update to the universal Kafka connector, I'm getting the
following error fairly often.
/2019-11-18 15:42:52,689 ERROR
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer
clientId=consumer-10, groupId=srx-consumer-group] Offset commit failed on
partition events-4 at offset 38173628004: The request timed out.
&unknown 2019-11-18 15:42:52,707 WARN
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher -
Committing offsets to Kafka failed. This does not compromise Flink's
checkpoints.
&unknown 2019-11-18 15:42:52,707 WARN
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async
Kafka commit failed.
red 2019-11-18 15:42:52,689 ERROR
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer
clientId=consumer-10, groupId=srx-consumer-group] Offset commit failed on
partition events-4 at offset 38173628004: The request timed out.
yellow 2019-11-18 15:42:52,707 WARN
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher -
Committing offsets to Kafka failed. This does not compromise Flink's
checkpoints.
yellow 2019-11-18 15:42:52,707 WARN
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async
Kafka commit failed./
---
I've been investigating and I can't find what is causing this. We've been
monitoring several metrics like the recordsConsumed, fetch-size-avg and
fetch-rate but the values are the same when the error happens and when it
doesn't. So we know there isn't a peak of events or a larger fetched size
when the problem occurs. We also monitor other metrics like CPU, Memory,
GCs, Network IO, Network connections and Disk IO but we haven't found
anything out of the ordinary.
Our job has two source nodes reading from two distinct kafka topics, the
problem happens on both source nodes.
*Flink Version: *1.8.2
*Kafka Version:* 2.3.0
*My kafka consumer Properties:*
/2019-11-14 16:51:20,142 INFO
org.apache.kafka.clients.consumer.ConsumerConfig -
ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [hidden]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = srx-consumer-group
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = /etc/pki/java/ca.ks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = /etc/pki/java/ca.ks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
/
*
Commit latency vs commits failed:*
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/Screenshot_2019-11-26_at_09.png>
We have checkpointing enabled (synchronous checkpoints every 10 secs). We
also have the job configured to commit kafka offsets on each checkpoint
(default behaviour).
-----
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/