Hi, We are building a spark streaming application which reads from kafka, does updateStateBykey based on the received message type and finally stores into redis.
After running for few seconds the executor process get killed by throwing OutOfMemory error. The code snippet is below: *NoOfReceiverInstances = 1* *val kafkaStreams = (1 to NoOfReceiverInstances).map(* * _ => KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, TopicsMap)* *)* *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, Long)]) => {...}* *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))* *object RedisHelper {* * private val client = scredis.Redis(* * ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)* * )* * def update(**itr: Iterator[(String, (Long, Long))]) {* * // redis save operation* * }* *}* *Below is the spark configuration:* * spark.app.name <http://spark.app.name> = "XXXXXXX"* * spark.jars = "xxxx.jar"* * spark.home = "/spark-1.1.1-bin-hadoop2.4"* * spark.executor.memory = 1g* * spark.streaming.concurrentJobs = 1000* * spark.logConf = true* * spark.cleaner.ttl = 3600 //in milliseconds* * spark.default.parallelism = 12* * spark.executor.extraJavaOptions = "-Xloggc:gc.log -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof -XX:+HeapDumpOnOutOfMemoryError"* * spark.executor.logs.rolling.strategy = "size"* * spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB* * spark.executor.logs.rolling.maxRetainedFiles = 10* * spark.serializer = "org.apache.spark.serializer.KryoSerializer"* * spark.kryo.registrator = "xxx.NoOpKryoRegistrator"* other configurations are below *streaming {* * // All streaming context related configs should come here* * batch-duration = "1 second"* * checkpoint-directory = "/tmp"* * checkpoint-duration = "10 seconds"* * slide-duration = "1 second"* * window-duration = "1 second"* * partitions-for-shuffle-task = 32* * }* * kafka {* * no-of-receivers = 1* * zookeeper-quorum = "xxxx:2181"* * consumer-group = "xxxxx"* * topic = "xxxxx:2"* * }* We tried different combinations like - with spark 1.1.0 and 1.1.1. - by increasing executor memory - by changing the serialization strategy (switching between kryo and normal java) - by changing broadcast strategy (switching between http and torrent broadcast) Can anyone give any insight what we are missing here? How can we fix this? Due to akka version mismatch with some other libraries we cannot upgrade the spark version. Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com