In that case, is there a way to detect that a consumer instance is no longer usable, so that we can recreate the instance on the fly again to have it reconnect? Without having to restart our app?
Thanks, -Vinh On Feb 11, 2014, at 7:45 AM, Jun Rao <jun...@gmail.com> wrote: > We do catch the exception. However, we don't know what to do with it. > Retrying may not fix the problem. So, we just log it and let the thread die. > > Thanks, > > Jun > > > On Mon, Feb 10, 2014 at 8:42 PM, Philip O'Toole <phi...@loggly.com> wrote: > >> Yes, there might be - we experience link resets every so often, and >> definitely did today. >> >> Assume it is this, are you surprised the thread went down? Perhaps we need >> to catch this? >> >> Philip >> >>> On Feb 10, 2014, at 8:38 PM, Jun Rao <jun...@gmail.com> wrote: >>> >>> This indicates that message checksum validation failed. Is there any >> issue >>> with the network? >>> >>> Thanks, >>> >>> Jun >>> >>> >>>> On Mon, Feb 10, 2014 at 5:00 PM, Philip O'Toole <phi...@loggly.com> >> wrote: >>>> >>>> Saw this thrown today, which brought down a Consumer thread -- we're >> using >>>> Consumers built on the High-level consumer framework. What may have >>>> happened here? We are using a custom C++ Producer which does not do >>>> compression, and which hasn't changed in months, but this error is >>>> relatively new to us, and is occurring occasionally. We are running the >> Sun >>>> JDK: >>>> >>>> java version "1.7.0_25" >>>> Java(TM) SE Runtime Environment (build 1.7.0_25-b15) >>>> Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode) >>>> >>>> Restarting the Consumer clears it up, so the message on the Broker >> itself >>>> does not appear to be problematic. We are running 3 Consumers, each of >>>> which has 48 ConsumerConnector objects. Our code explicitly calls >> commit(), >>>> we do not auto-commit. >>>> >>>> Thanks, >>>> >>>> Philip >>>> >>>> 2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in >>>> FetcherRunnable for premapped:2-29: fetched offset = 120758878080: >> consumed >>>> offset = 120758878080 >>>> kafka.message.InvalidMessageException: message is invalid, compression >>>> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init >> offset: >>>> 120758878080 >>>> at >>>> >>>> >> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) >>>> at >>>> >>>> >> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) >>>> at >>>> >>>> >> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) >>>> at >>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >>>> at >> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >>>> at >>>> >>>> >> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) >>>> at >>>> >>>> >> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) >>>> at >>>> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) >>>> at >>>> >>>> >> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) >>>> at >>>> >>>> >> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) >>>> at >>>> >>>> >> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) >>>> at scala.collection.immutable.List.foreach(List.scala:45) >>>> at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) >>>> 2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in >>>> FetcherRunnable >>>> kafka.message.InvalidMessageException: message is invalid, compression >>>> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init >> offset: >>>> 120758878080 >>>> at >>>> >>>> >> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) >>>> at >>>> >>>> >> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) >>>> at >>>> >>>> >> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) >>>> at >>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >>>> at >> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >>>> at >>>> >>>> >> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) >>>> at >>>> >>>> >> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) >>>> at >>>> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) >>>> at >>>> >>>> >> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) >>>> at >>>> >>>> >> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) >>>> at >>>> >>>> >> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) >>>> at scala.collection.immutable.List.foreach(List.scala:45) >>>> at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) >>>> >>