Hi,
I have a fairly simple stateful streaming job that suffers from high GC and
it's executors are killed as they are exceeding the size of the requested
container.
My current executor-memory is 10G, spark overhead is 2G and it's running
with one core.

At first the job begins running at a rate that is below the batch time
(45s) and after a few batches it starts running at much slower rates (2.1
minutes at times).

This is the relevant code:

import sparkSession.implicits._
    val x: RDD[(String, Record)] = ss.sql(s"""select  * from
data_table  where partition_ts >=
${DateTime.now.minusHours(10).toString("yyyyMMddHH")}""").as[Record].map(x
=> {
      (x.iid, x)
    }).rdd
    val stateSpec = StateSpec.function(trackStateFunc _).numPartitions(16)
      .timeout(Durations.minutes(60 * 48)).initialState(x)
    val ssc = new StreamingContext(sc, Seconds(45))
    val sqlContext = ss.sqlContext
    val kafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
    val stateStream  =
kafkaStream.transform(x=>{x.map(_._2)}).transform(x=>{val sparkSession
= ss ; import sparkSession.implicits._ ;
sqlContext.read.schema(schema).json(x).as[Record].map(r=>Tuple2(r.iid,r))}.rdd).mapWithState(stateSpec)
        
stateStream.foreachRDD(x=>{x.coalesce(16).toDF().write.mode(SaveMode.Append).insertInto("joineddata")})


Right now, after playing with the parameters a bit, I'm running with
spark.memory.storageFraction=0 and spark.memory.fraction=0.2  .

Any help would be appreciated.

Thank you,

Daniel

Reply via email to