Re: Spark Streaming - Kafka - java.nio.BufferUnderflowException

2016-05-25 Thread Cody Koeninger
I'd fix the kafka version on the executor classpath (should be
0.8.2.1) before trying anything else, even if it may be unrelated to
the actual error.  Definitely don't upgrade your brokers to 0.9

On Wed, May 25, 2016 at 2:30 AM, Scott W  wrote:
> I'm running into below error while trying to consume message from Kafka
> through Spark streaming (Kafka direct API). This used to work OK when using
> Spark standalone cluster manager. We're just switching to using Cloudera 5.7
> using Yarn to manage Spark cluster and started to see the below error.
>
> Few details:
> - Spark 1.6.0
> - Using Kafka direct stream API
> - Kafka broker version (0.8.2.1)
> - Kafka version in the classpath of Yarn executors (0.9)
> - Kafka brokers not managed by Cloudera
>
> The only difference I see between using standalone cluster manager and yarn
> is the Kafka version being used on the consumer end. (0.8.2.1 vs 0.9)
>
> Trying to figure if version mismatch is really an issue ? If indeed the
> case, what would be the fix for this other than upgrading Kafka brokers to
> 0.9 as well. (eventually yes but not for now) OR is there something else I'm
> missing here.
>
> Appreciate the help.
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
> stage 200.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 200.0 (TID 203,..): java.nio.BufferUnderflowException
> at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:151)
> at java.nio.ByteBuffer.get(ByteBuffer.java:715)
> at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:40)
> at kafka.api.TopicData$.readFrom(FetchResponse.scala:96)
> at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
> at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at scala.collection.immutable.Range.foreach(Range.scala:141)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at
> org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:942)
> at
> org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:942)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

---

Spark Streaming - Kafka - java.nio.BufferUnderflowException

2016-05-25 Thread Scott W
I'm running into below error while trying to consume message from Kafka
through Spark streaming (Kafka direct API). This used to work OK when using
Spark standalone cluster manager. We're just switching to using Cloudera
5.7 using Yarn to manage Spark cluster and started to see the below error.

Few details:
- Spark 1.6.0
- Using Kafka direct stream API
- Kafka broker version (0.8.2.1)
- Kafka version in the classpath of Yarn executors (0.9)
- Kafka brokers not managed by Cloudera

The only difference I see between using standalone cluster manager and yarn
is the Kafka version being used on the consumer end. (0.8.2.1 vs 0.9)

*Trying to figure if version mismatch is really an issue ? If indeed the
case, what would be the fix for this other than upgrading Kafka brokers to
0.9 as well. (eventually yes but not for now) OR is there something else
I'm missing here.*

Appreciate the help.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 200.0 failed 4 times, most recent failure: Lost task 0.3 in stage
200.0 (TID 203,..): java.nio.BufferUnderflowException
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:151)
at java.nio.ByteBuffer.get(ByteBuffer.java:715)
at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:40)
at kafka.api.TopicData$.readFrom(FetchResponse.scala:96)
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:942)
at
org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:942)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)



Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)