Our topology is running version 1.2.3 and 2.3.0 for kafka-clients. We recently
noticed the following before crashing on a weekend:
2019-10-18 21:05:16,256 ERROR util [Thread-130-customKafkaSpout-executor[17
17]] Async loop died!
java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException:
Failed to get offsets by times in 60000ms
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477)
~[storm-core-1.2.1.jar:1.2.1]
at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70)
~[storm-core-1.2.1.jar:1.2.1]
at
org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634)
~[storm-core-1.2.1.jar:1.2.1]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
~[storm-core-1.2.1.jar:1.2.1]
at clojure.lang.AFn.run(AFn.java:22) ~[clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]
These crashes coincided with Kafka broker bounces. Our kafka cluster has 6
brokers, and each partition has 6 replicas. Only one broker was ever down at
once, so the ISR of each partition in the topic seemed to never be lower than
5. This exception seemed to come from outside of the main kafka spout thread
since we are catching exceptions in there. Looking into the kafka code a little
further, this comes from the private method fetchOffsetsByTimes in the
Fetcher.java class:
https://github.com/apache/kafka/blob/2.3.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L538.
This method is called by Consumer.offsetsByTimes, Consumer.beginningOffsets,
and Consumer.endOffsets. I noticed that beginningOffsets and endOffsets are
called here:
https://github.com/apache/storm/blob/e21110d338fe8ca71b904682be35642a00de9e78/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java#L80,
which would explain the error happening outside of the KafkaSpout nextTuple
thread. So a couple of questions:
-Is this a known error? It seemed to happen every time a broker came down
-What are these metrics/where can they be viewed? Is there a way to disable
them?