Hi all,

I'm developing a spark application where I need to iteratively update an RDD over a large number of iterations (1000+). From reading online, I've found that I should use .checkpoint() to keep the graph from growing too large. Even when doing this, I keep getting StackOverflowError's in DAGScheduler such as the one below. I've attached a sample application that illustrates what I'm trying to do. Can anyone point out how I can keep the DAG from growing so large that spark is not able to process it?

Thank you,
David


java.lang.StackOverflowError
at scala.collection.generic.GenMapFactory$MapCanBuildFrom.scala$collection$generic$GenMapFactory$MapCanBuil
dFrom$$$outer(GenMapFactory.scala:57)
at scala.collection.generic.GenMapFactory$MapCanBuildFrom.apply(GenMapFactory.scala:58) at scala.collection.generic.GenMapFactory$MapCanBuildFrom.apply(GenMapFactory.scala:57) at scala.collection.TraversableLike$class.$plus$plus(TraversableLike.scala:154) at scala.collection.AbstractTraversable.$plus$plus(Traversable.scala:105)
        at scala.collection.immutable.HashMap.$plus(HashMap.scala:60)
        at scala.collection.immutable.Map$Map4.updated(Map.scala:172)
        at scala.collection.immutable.Map$Map4.$plus(Map.scala:173)
        at scala.collection.immutable.Map$Map4.$plus(Map.scala:158)
at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28) at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:24) at scala.collection.TraversableOnce$$anonfun$toMap$1.apply(TraversableOnce.scala:280) at scala.collection.TraversableOnce$$anonfun$toMap$1.apply(TraversableOnce.scala:279) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableOnce$class.toMap(TraversableOnce.scala:279) at scala.collection.AbstractTraversable.toMap(Traversable.scala:105) at org.apache.spark.storage.BlockManager$.blockIdsToBlockManagers(BlockManager.scala:1264) at org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:199) at org.apache.spark.scheduler.DAGScheduler.visit$3(DAGScheduler.scala:372) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getMissingParentStages(
DAGScheduler.scala:389)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGSchedule
r.scala:774)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.
apply(DAGScheduler.scala:781)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.
apply(DAGScheduler.scala:780)
        at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGSchedule
r.scala:780)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.
apply(DAGScheduler.scala:781)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.
apply(DAGScheduler.scala:780)
        at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGSchedule
r.scala:780)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.
apply(DAGScheduler.scala:781)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.
apply(DAGScheduler.scala:780)
       .... (last 4 lines repeating 1000's of times)


--

David McWhorter
Software Engineer
Commonwealth Computer Research, Inc.
1422 Sachem Place, Unit #1
Charlottesville, VA 22901
mcwhor...@ccri.com | 434.299.0090x204

import com.typesafe.scalalogging.slf4j.Logging
import org.apache.spark.SparkContext._
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Random

/**
 * To run:
 *
 * ./spark-submit --class SparkExample --master local --num-executors 8
 * --driver-memory 3600m --executor-memory 1800m app.jar [iterations] [num-partitions] [reducers]
 *
 */
object SparkExample extends App with Logging {
  val iterations = args.lift(0).map(_.toInt).getOrElse(1000)
  val partitions = args.lift(1).map(_.toInt).getOrElse(200)
  val reducers = args.lift(2).map(_.toInt).getOrElse(8)

  val conf = new SparkConf()
    .setAppName("SparkExample")
    .setMaster("local")
    .set("spark.serializer", classOf[KryoSerializer].getName)
    .set("spark.storage.memoryFraction", "0.4")
    .set("spark.kryoserializer.buffer.mb", "256")
    .set("spark.shuffle.consolidateFiles", "true")
  val sc = new SparkContext(conf)

  sc.setCheckpointDir("/tmp/checkpoint")

  val randomNumbers = sc.parallelize((0 until iterations).map{ i => (Random.nextLong(), Random.nextDouble()) })
    .repartition(partitions)
    .persist(StorageLevel.MEMORY_AND_DISK_SER)
  randomNumbers.checkpoint()

  val initialNumbers = randomNumbers.keys.map{ key => (key, 0.0) }
    .persist(StorageLevel.MEMORY_AND_DISK_SER)


  val checkpointEvery = 10 //iterations
  val numbers = (1 to iterations).foldLeft(initialNumbers) { case (numbersSoFar, i) =>
    logger.info(s"starting iteration $i")

    val newNumbers =
      numbersSoFar
        .join(randomNumbers)
        .mapValues{ case (num1, num2) => num1 + num2 }
        .union(randomNumbers)
        .reduceByKey((num1: Double, num2: Double) => num1 + num2, reducers)
        .repartition(partitions)
        .persist(StorageLevel.MEMORY_AND_DISK_SER)

    numbersSoFar.unpersist(blocking = false) // most important

    if(i % checkpointEvery == 0) newNumbers.checkpoint()

    newNumbers
  }

  logger.info(s"numbers:\n${numbers.take(10).map { case (key, value) => s"key: $key, value: $value"}.mkString("\n")}")
  logger.info(s"Received ${numbers.count()} numbers")

}


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to