Whoops, I think the user mailing list fell off the reply list. Sorry, didn't mean to mail you directly.
I haven't heard of this before, but people may have encountered it without mentioning it. I am not aware of a workaround. You're right that it would be good to get this fixed. https://issues.apache.org/jira/browse/STORM-3529 is open if you want to work on it. I think it should be pretty easy to catch and log RetriableExceptions in the same way we do elsewhere in the spout. Den man. 4. nov. 2019 kl. 19.29 skrev Mitchell Rathbun (BLOOMBERG/ 731 LEX) <[email protected]>: > I increased the "default.api.timeout.ms" as mentioned, and am still > getting the error. I need to dig into the Kafka code some more, but if > there isn't a simple config fix I can make, then isn't this a critical > issue? A Kafka broker being brought down should not be a fatal issue for > the topology, especially when the exception is coming from a non-critical > metrics class. Do you know of any workarounds to this/has this been seen > before? It seems like it should be a pretty common issue, given that broker > turnaround in Kafka is not that uncommon. > > From: Mitchell Rathbun (BLOOMBERG/ 731 LEX) At: 10/28/19 19:20:30 > To: [email protected] > Subject: Re: Offset fetch failure causing topology crash > > I did some digging and I believe that we are now seeing this due to a > recent upgrade from Kafka 0.10.1.0 to 2.3.0. The timeout used for the > beginningOffsets/endOffsets calls was reduced from over 5 minutes to 60 > seconds with this change. In 2.3.0, this property is set by " > default.api.timeout.ms". There is also a property called " > offset.commit.period.ms", which controls how often offsets are committed > in the KafkaSpout. This property has a default value of 30 seconds. So if > this fails once due to a broker that was a leader for one of the consumer's > partitions being brought down, then the next time commits are attempted > would constitute a timeout. So I am going to try either reducing " > offset.commit.period.ms" or increasing "default.api.timeout.ms" and see > if that fixes the issue. > > From: [email protected] At: 10/25/19 18:57:31 > To: Mitchell Rathbun (BLOOMBERG/ 731 LEX ) <[email protected]> > Subject: Re: Offset fetch failure causing topology crash > > See > https://github.com/apache/storm/blob/7b1a98fc10fad516ef9ed0b3afc53a1d7be8a169/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L157 > and > https://github.com/apache/storm/blob/7b1a98fc10fad516ef9ed0b3afc53a1d7be8a169/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java > . > > I don't think there's a flag to disable the metrics currently. Regarding > where they can be viewed, please see the section on metrics consumers at > https://storm.apache.org/releases/2.0.0/Metrics.html. > > We might want to change the metrics class to catch RetriableException and > ignore them (with logging). Have raised > https://issues.apache.org/jira/browse/STORM-3529. > > Den fre. 25. okt. 2019 kl. 21.09 skrev Mitchell Rathbun (BLOOMBERG/ 731 > LEX) <[email protected]>: > >> Our topology is running version 1.2.3 and 2.3.0 for kafka-clients. We >> recently noticed the following before crashing on a weekend: >> >> 2019-10-18 21:05:16,256 ERROR util >> [Thread-130-customKafkaSpout-executor[17 17]] Async loop died! >> java.lang.RuntimeException: >> org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by >> times in 60000ms >> at >> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) >> ~[storm-core-1.2.1.jar:1.2.1] >> at >> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) >> ~[storm-core-1.2.1.jar:1.2.1] >> at >> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) >> ~[storm-core-1.2.1.jar:1.2.1] >> at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) >> ~[storm-core-1.2.1.jar:1.2.1] >> at >> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634) >> ~[storm-core-1.2.1.jar:1.2.1] >> at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) >> ~[storm-core-1.2.1.jar:1.2.1] >> at clojure.lang.AFn.run(AFn.java:22) ~[clojure-1.7.0.jar:?] >> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172] >> >> These crashes coincided with Kafka broker bounces. Our kafka cluster has >> 6 brokers, and each partition has 6 replicas. Only one broker was ever down >> at once, so the ISR of each partition in the topic seemed to never be lower >> than 5. This exception seemed to come from outside of the main kafka spout >> thread since we are catching exceptions in there. Looking into the kafka >> code a little further, this comes from the private method >> fetchOffsetsByTimes in the Fetcher.java class: >> https://github.com/apache/kafka/blob/2.3.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L538. >> This method is called by Consumer.offsetsByTimes, >> Consumer.beginningOffsets, and Consumer.endOffsets. I noticed that >> beginningOffsets and endOffsets are called here: >> https://github.com/apache/storm/blob/e21110d338fe8ca71b904682be35642a00de9e78/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java#L80, >> which would explain the error happening outside of the KafkaSpout nextTuple >> thread. So a couple of questions: >> >> -Is this a known error? It seemed to happen every time a broker came down >> -What are these metrics/where can they be viewed? Is there a way to >> disable them? >> >> > >
