Re: java.lang.OutOfMemoryError at simple local test

2014-11-26 Thread rzykov
We made some changes in code (it generates 1000 * 1000 elements) and memory
limits up to 100M:

def generate = {
j - 1 to 10
i - 1 to 1000
  } yield(j, i)

~/soft/spark-1.1.0-bin-hadoop2.3/bin/spark-submit --master local
--executor-memory 100M --driver-memory 100M --class Spill --num-executors 1
--executor-cores 1 target/scala-2.10/Spill-assembly-1.0.jar

The result of this: 
14/11/24 14:57:40 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception
in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded

We decided to check this one by profiler and took this screenshot:

Each element of collection takes 48 bytes. Each element  = scala.Tuple2 of 2
But Scala  supports @specialized
unboxed primitive type of Int. Which takes 4 bytes only.
So from this point of view this collection would take about 1000 * 1000 * 2
* 4 = 8 Mb + some overheads.
This number in 5 times less then current result of memory consumpution.
Why Spark didn't use primitive (@specialized) types in this case?

2014-11-22 Thread rzykov
Dear all, 

Unfortunately I've not got ant respond in users forum. That's why I decided
to publish this question here.
We encountered problems of failed jobs with huge amount of data. For
example, an application works perfectly with relative small sized data, but
when it grows in 2 times this  application fails.

A simple local test was prepared for this question at
It generates 2 sets of key-value pairs, join them, selects distinct values
and counts data finally. 

object Spill { 
  def generate = { 
  j - 1 to 10 
  i - 1 to 200 
} yield(j, i) 
  def main(args: Array[String]) { 
val conf = new SparkConf().setAppName(getClass.getSimpleName) 
conf.set(spark.shuffle.spill, true) 
val sc = new SparkContext(conf) 
val dataA = sc.parallelize(generate) 
val dataB = sc.parallelize(generate) 
val dst = dataA.join(dataB).distinct().count() 

We compiled it locally and run 3 times with different settings of memory: 
1) --executor-memory 10M --driver-memory 10M --num-executors 1
--executor-cores 1
It fails wtih java.lang.OutOfMemoryError: GC overhead limit exceeded at 

2) --executor-memory 20M --driver-memory 20M --num-executors 1
--executor-cores 1
It works OK 

3)  --executor-memory 10M --driver-memory 10M --num-executors 1
--executor-cores 1 But let's make less data for i from 200 to 100. It
reduces input data in 2 times and joined data in 4 times 

  def generate = { 
  j - 1 to 10 
  i - 1 to 100   // previous value was 200 
} yield(j, i) 
This code works OK. 

We don't understand why 10M is not enough for such simple operation with
32000 bytes of ints (2 * 10 * 200 * 2 * 4) approximately? 10M of RAM works
if we change the data volume in 2 times (2000 of records of (int, int)).   
Why spilling to disk doesn't cover this case? 

