I'd fix the kafka version on the executor classpath (should be
0.8.2.1) before trying anything else, even if it may be unrelated to
the actual error.  Definitely don't upgrade your brokers to 0.9

On Wed, May 25, 2016 at 2:30 AM, Scott W <defy...@gmail.com> wrote:
> I'm running into below error while trying to consume message from Kafka
> through Spark streaming (Kafka direct API). This used to work OK when using
> Spark standalone cluster manager. We're just switching to using Cloudera 5.7
> using Yarn to manage Spark cluster and started to see the below error.
>
> Few details:
> - Spark 1.6.0
> - Using Kafka direct stream API
> - Kafka broker version (0.8.2.1)
> - Kafka version in the classpath of Yarn executors (0.9)
> - Kafka brokers not managed by Cloudera
>
> The only difference I see between using standalone cluster manager and yarn
> is the Kafka version being used on the consumer end. (0.8.2.1 vs 0.9)
>
> Trying to figure if version mismatch is really an issue ? If indeed the
> case, what would be the fix for this other than upgrading Kafka brokers to
> 0.9 as well. (eventually yes but not for now) OR is there something else I'm
> missing here.
>
> Appreciate the help.
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
> stage 200.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 200.0 (TID 203,..): java.nio.BufferUnderflowException
>     at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:151)
>     at java.nio.ByteBuffer.get(ByteBuffer.java:715)
>     at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:40)
>     at kafka.api.TopicData$.readFrom(FetchResponse.scala:96)
>     at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
>     at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
>     at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>     at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>     at scala.collection.immutable.Range.foreach(Range.scala:141)
>     at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>     at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
>     at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
>     at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
>     at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
>     at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>     at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>     at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>     at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>     at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>     at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>     at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>     at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>     at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>     at
> org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:942)
>     at
> org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:942)
>     at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
>     at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>     at org.apache.spark.scheduler.Task.run(Task.scala:89)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
>
>
> Driver stacktrace:
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>     at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to