That looks like the OOM is in the driver, when getting partition metadata
to create the direct stream.  In that case, executor memory allocation
doesn't matter.

Allocate more driver memory, or put a profiler on it to see what's taking
up heap.



On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak <sourabh3...@gmail.com>
wrote:

> Adding Cody and Sriharsha
>
> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <sourabh3...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have ported receiver less spark streaming for kafka to Spark 1.2 and am
>> trying to run a spark streaming job to consume data form my broker, but I
>> am getting the following error:
>>
>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size 352518400
>> java.lang.OutOfMemoryError: Java heap space
>>         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>         at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>>         at
>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>>         at
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>>         at
>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>         at
>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>         at
>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>>         at
>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>>         at
>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>>         at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>>         at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>         at
>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>         at org.apache.spark.streaming.kafka.KafkaCluster.org
>> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
>>         at
>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
>>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
>>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>         at
>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>>
>> I have tried allocating 100G of memory with 1 executor but it is still
>> failing.
>>
>> Spark version: 1.2.2
>> Kafka version ported: 0.8.2
>> Kafka server version: trunk version with SSL enabled
>>
>> Can someone please help me debug this.
>>
>> Thanks,
>> Sourabh
>>
>
>

Reply via email to