Hi, I have a fairly simple stateful streaming job that suffers from high GC and it's executors are killed as they are exceeding the size of the requested container. My current executor-memory is 10G, spark overhead is 2G and it's running with one core.
At first the job begins running at a rate that is below the batch time (45s) and after a few batches it starts running at much slower rates (2.1 minutes at times). This is the relevant code: import sparkSession.implicits._ val x: RDD[(String, Record)] = ss.sql(s"""select * from data_table where partition_ts >= ${DateTime.now.minusHours(10).toString("yyyyMMddHH")}""").as[Record].map(x => { (x.iid, x) }).rdd val stateSpec = StateSpec.function(trackStateFunc _).numPartitions(16) .timeout(Durations.minutes(60 * 48)).initialState(x) val ssc = new StreamingContext(sc, Seconds(45)) val sqlContext = ss.sqlContext val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) val stateStream = kafkaStream.transform(x=>{x.map(_._2)}).transform(x=>{val sparkSession = ss ; import sparkSession.implicits._ ; sqlContext.read.schema(schema).json(x).as[Record].map(r=>Tuple2(r.iid,r))}.rdd).mapWithState(stateSpec) stateStream.foreachRDD(x=>{x.coalesce(16).toDF().write.mode(SaveMode.Append).insertInto("joineddata")}) Right now, after playing with the parameters a bit, I'm running with spark.memory.storageFraction=0 and spark.memory.fraction=0.2 . Any help would be appreciated. Thank you, Daniel