We have a fairly simple class that runs in a loop and consumes messages from Kafka and feeds it to our stream processing system.
{ ..... consumerConnector = Consumer.create(new ConsumerConfig(props)) val topicMessageStreams = consumerConnector.createMessageStreams(Map(topic -> 1)) // We only care about the first streamList from topicMessageStreams ... kafkaStream = topicMessageStreams(topic)(0) while (true) { val logMessage: String = Utils.toString(kafkaStream.head.payload, "UTF-8") // do stuff with the message. } } When this code gets an exception, it swallows it on the assumption that the error is transient, and continues on its merry way. Obviously this isn't the right thing to do in all cases (or even any case perhaps)-- over a weekend, this code kept getting the same exception and eventually logged many hundred gigs of error messages before it got restarted. The exception we were getting from Kafka was: java.lang.IllegalStateException: Iterator is in failed state at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:46) at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:35) at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:38)" I was wondering what exceptions are transient, which ones need special handing (say reconnecting to kafka? or just exiting the JVM and have our job monitors restart the process again). For example, with the iterator in an invalid state, would creating a new connector have helped? Any help would be appreciated. Thanks, Manish