Hey Spark Users,

I kept getting java.lang.OutOfMemoryError: Java heap space as I read a
massive amount of json files, iteratively via read.json(). Even the result
RDD is rather small, I still get the OOM Error. The brief structure of my
program reads as following, in psuedo-code:

file_path_list.map{ jsonFile: String =>
  sqlContext.read.json(jsonFile)
    .select($"some", $"fields")
    .withColumn("new_col", some_transformations($"col"))
    .rdd.map( x: Row => (k, v) )
    .combineByKey() // which groups a column into item lists by
another column as keys
}.reduce( (i, j) => i.union(j) )
.combineByKey() // which combines results from all json files

I confess some of the json files are Gigabytes huge, yet the combined RDD
is in a few Megabytes. I’m not familiar with the under-the-hood mechanism,
but my intuitive understanding of how the code executes is, read the file
once a time (where I can easily modify map to foreach when fetching from
file_path_list, if that’s the case), do the inner transformation on DF and
combine, then reduce and do the outer combine immediately, which doesn’t
require to hold all RDDs generated from all files in the memory. Obviously,
as my code raises OOM Error, I must have missed something important.

>From the debug log, I can tell the OOM Error happens when reading the same
file, which is in a modest size of 2GB, while driver.memory is set to 13GB,
and the available memory size before the code execution is around 8GB, on
my standalone machine running as “local[8]”.

To overcome this, I also tried to initialize an empty universal RDD
variable, iteratively read one file at a time using foreach, then instead
of reduce, simply combine each RDD generated by the json files, except the
OOM Error remains.

Other configurations:

   - set(“spark.storage.memoryFraction”, “0.1”) // no cache of RDD is used
   - set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)

Any suggestions other than scale up/out the spark cluster?

BR,
Todd Leo
​

Reply via email to