My program in pseudocode looks like this:

    val conf = new SparkConf().setAppName("Test")
      .set("spark.storage.memoryFraction","0.2") // default 0.6
      .set("spark.shuffle.memoryFraction","0.12") // default 0.2
      .set("spark.shuffle.manager","SORT") // preferred setting for
optimized joins
      .set("spark.shuffle.consolidateFiles","true") // helpful for "too
many files open"
      .set("spark.mesos.coarse", "true") // helpful for MapOutputTracker
errors?
      .set("spark.akka.frameSize","500") // helpful when using
consildateFiles=true
      .set("spark.akka.askTimeout", "30")
      .set("spark.shuffle.compress","false") //
http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
      .set("spark.file.transferTo","false") //
http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
      .set("spark.core.connection.ack.wait.timeout","600") //
http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
      .set("spark.speculation","true")
      .set("spark.worker.timeout","600") //
http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
      .set("spark.akka.timeout","300") //
http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
      .set("spark.storage.blockManagerSlaveTimeoutMs","120000")
      .set("spark.driver.maxResultSize","2048") // in response to error:
Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")
      .set("spark.kryo.registrationRequired", "true")

val rdd1 = 
sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
-1)...filter(...)

val rdd2 =
sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
-1)...filter(...)

rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


I run the code with:
  --num-executors 500 \
  --driver-memory 20g \
  --executor-memory 20g \
  --executor-cores 32 \


I'm using kryo serialization on everything, including broadcast variables.

Spark creates 145k tasks, and the first stage includes everything before
groupByKey(). It fails before getting to groupByKey. I have tried doubling
and tripling the number of partitions when calling textFile, with no
success.

Very similar code (trivial changes, to accomodate different input) worked
on a smaller input (~8TB)... Not that it was easy to get that working.



Errors vary, here is what I am getting right now:

ERROR SendingConnection: Exception while reading SendingConnection
... java.nio.channels.ClosedChannelException
(^ guessing that is symptom of something else)

WARN BlockManagerMasterActor: Removing BlockManager
BlockManagerId(...) with no recent heart beats: 120030ms exceeds 120000ms
(^ guessing that is symptom of something else)

ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting down
ActorSystem [sparkDriver]
*java.lang.OutOfMemoryError: GC overhead limit exceeded*



Other times I will get messages about "executor lost..." about 1 message
per second, after ~~50k tasks complete, until there are almost no executors
left and progress slows to nothing.

I ran with verbose GC info; I do see failing yarn containers that have
multiple (like 30) "Full GC" messages but I don't know how to interpret if
that is the problem. Typical Full GC time taken seems ok: [Times:
user=23.30 sys=0.06, real=1.94 secs]



Suggestions, please?

Huge thanks for useful suggestions,
Arun

Reply via email to