Hi,

We are building a spark streaming application which reads from kafka, does
updateStateBykey based on the received message type and finally stores into
redis.

After running for few seconds the executor process get killed by throwing
OutOfMemory error.

The code snippet is below:


*NoOfReceiverInstances = 1*

*val kafkaStreams = (1 to NoOfReceiverInstances).map(*
*  _ => KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, TopicsMap)*
*)*
*val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
Long)]) => {...}*

*ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))*



*object RedisHelper {*
*  private val client = scredis.Redis(*
*
ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)*
*  )*

*  def update(**itr: Iterator[(String, (Long, Long))]) {*
*    // redis save operation*
*  }*

*}*


*Below is the spark configuration:*


*    spark.app.name <http://spark.app.name> = "XXXXXXX"*
*    spark.jars = "xxxx.jar"*
*    spark.home = "/spark-1.1.1-bin-hadoop2.4"*
*    spark.executor.memory = 1g*
*    spark.streaming.concurrentJobs = 1000*
*    spark.logConf = true*
*    spark.cleaner.ttl = 3600 //in milliseconds*
*    spark.default.parallelism = 12*
*    spark.executor.extraJavaOptions = "-Xloggc:gc.log -XX:+PrintGCDetails
-XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof
-XX:+HeapDumpOnOutOfMemoryError"*
*    spark.executor.logs.rolling.strategy = "size"*
*    spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB*
*    spark.executor.logs.rolling.maxRetainedFiles = 10*
*    spark.serializer = "org.apache.spark.serializer.KryoSerializer"*
*    spark.kryo.registrator = "xxx.NoOpKryoRegistrator"*


other configurations are below

*streaming {*
*    // All streaming context related configs should come here*
*    batch-duration = "1 second"*
*    checkpoint-directory = "/tmp"*
*    checkpoint-duration = "10 seconds"*
*    slide-duration = "1 second"*
*    window-duration = "1 second"*
*    partitions-for-shuffle-task = 32*
*  }*
*  kafka {*
*    no-of-receivers = 1*
*    zookeeper-quorum = "xxxx:2181"*
*    consumer-group = "xxxxx"*
*    topic = "xxxxx:2"*
*  }*

We tried different combinations like
 - with spark 1.1.0 and 1.1.1.
 - by increasing executor memory
 - by changing the serialization strategy (switching between kryo and
normal java)
 - by changing broadcast strategy (switching between http and torrent
broadcast)


Can anyone give any insight what we are missing here? How can we fix this?

Due to akka version mismatch with some other libraries we cannot upgrade
the spark version.

Thanks,
-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com

Reply via email to