Hi Sourabh,

We have seen this error, if kafka broker was running with SSL on Consumer
is trying to consumer in plaintext mode. Are you using high level consumer
or SimpleConsumer..? If you using using SimpleConsumer, pull latest code
from my repo <https://github.com/relango/kafka/commits/kafka_security_0.8.2>
and
pass secure parameters to SimpleConsumer constructor.

Thanks,
Raja.

On Thu, Aug 6, 2015 at 9:01 PM, Sourabh Chandak <sourabh3...@gmail.com>
wrote:

> Hi,
>
> I am trying to integrate
> https://github.com/relango/kafka/tree/kafka_security_0.8.2 with Spark
> Streaming using the SimpleConsumer. I know that the SSL patch is on its way
> but need to set up a prototype hence went ahead with Raja's version.
>
> So when I run my spark job to retrieve data from 1 topic with just 1
> partition I get a OutOfMemoryError.
>
> Here is the stack trace:
>
> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
>     at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>     at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>     at
>
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>     at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>     at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>     at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>     at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>     at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:91)
>     at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>     at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>     at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>     at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>     at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>     at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>     at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>     at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>     at org.apache.spark.streaming.kafka.KafkaCluster.org
>
> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>     at
>
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>     at
>
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>     at
>
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:310)
>
> Need help from experts to resolve this.
>
> Thanks,
> Sourabh
>



-- 
Thanks,
Raja.

Reply via email to