Re: OutOfMemoryError When Reading Many json Files
Hi Why dont you check if you can just process the large file standalone and then do the outer loop next. sqlContext.read.json(jsonFile) .select($"some", $"fields") .withColumn( "new_col", some_transformations($"col")) .rdd.map( x: Row => (k, v) ) .combineByKey() Deenar On 14 October 2015 at 05:18, SLiZn Liuwrote: > Hey Spark Users, > > I kept getting java.lang.OutOfMemoryError: Java heap space as I read a > massive amount of json files, iteratively via read.json(). Even the > result RDD is rather small, I still get the OOM Error. The brief structure > of my program reads as following, in psuedo-code: > > file_path_list.map{ jsonFile: String => > sqlContext.read.json(jsonFile) > .select($"some", $"fields") > .withColumn("new_col", some_transformations($"col")) > .rdd.map( x: Row => (k, v) ) > .combineByKey() // which groups a column into item lists by another > column as keys > }.reduce( (i, j) => i.union(j) ) > .combineByKey() // which combines results from all json files > > I confess some of the json files are Gigabytes huge, yet the combined RDD > is in a few Megabytes. I’m not familiar with the under-the-hood mechanism, > but my intuitive understanding of how the code executes is, read the file > once a time (where I can easily modify map to foreach when fetching from > file_path_list, if that’s the case), do the inner transformation on DF > and combine, then reduce and do the outer combine immediately, which > doesn’t require to hold all RDDs generated from all files in the memory. > Obviously, as my code raises OOM Error, I must have missed something > important. > > From the debug log, I can tell the OOM Error happens when reading the same > file, which is in a modest size of 2GB, while driver.memory is set to 13GB, > and the available memory size before the code execution is around 8GB, on > my standalone machine running as “local[8]”. > > To overcome this, I also tried to initialize an empty universal RDD > variable, iteratively read one file at a time using foreach, then instead > of reduce, simply combine each RDD generated by the json files, except the > OOM Error remains. > > Other configurations: > >- set(“spark.storage.memoryFraction”, “0.1”) // no cache of RDD is used >- set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) > > Any suggestions other than scale up/out the spark cluster? > > BR, > Todd Leo > >
Re: OutOfMemoryError When Reading Many json Files
Yes it went wrong when processing a large file only. I removed transformations on DF, and it worked just fine. But doing a simple filter operation on the DF became the last straw that breaks the camel’s back. That’s confusing. On Wed, Oct 14, 2015 at 2:11 PM Deenar Toraskarwrote: > Hi > > Why dont you check if you can just process the large file standalone and > then do the outer loop next. > > sqlContext.read.json(jsonFile) .select($"some", $"fields") .withColumn( > "new_col", some_transformations($"col")) .rdd.map( x: Row => (k, v) ) > .combineByKey() > > Deenar > > On 14 October 2015 at 05:18, SLiZn Liu wrote: > >> Hey Spark Users, >> >> I kept getting java.lang.OutOfMemoryError: Java heap space as I read a >> massive amount of json files, iteratively via read.json(). Even the >> result RDD is rather small, I still get the OOM Error. The brief structure >> of my program reads as following, in psuedo-code: >> >> file_path_list.map{ jsonFile: String => >> sqlContext.read.json(jsonFile) >> .select($"some", $"fields") >> .withColumn("new_col", some_transformations($"col")) >> .rdd.map( x: Row => (k, v) ) >> .combineByKey() // which groups a column into item lists by another >> column as keys >> }.reduce( (i, j) => i.union(j) ) >> .combineByKey() // which combines results from all json files >> >> I confess some of the json files are Gigabytes huge, yet the combined RDD >> is in a few Megabytes. I’m not familiar with the under-the-hood mechanism, >> but my intuitive understanding of how the code executes is, read the file >> once a time (where I can easily modify map to foreach when fetching from >> file_path_list, if that’s the case), do the inner transformation on DF >> and combine, then reduce and do the outer combine immediately, which >> doesn’t require to hold all RDDs generated from all files in the memory. >> Obviously, as my code raises OOM Error, I must have missed something >> important. >> >> From the debug log, I can tell the OOM Error happens when reading the >> same file, which is in a modest size of 2GB, while driver.memory is set to >> 13GB, and the available memory size before the code execution is around >> 8GB, on my standalone machine running as “local[8]”. >> >> To overcome this, I also tried to initialize an empty universal RDD >> variable, iteratively read one file at a time using foreach, then >> instead of reduce, simply combine each RDD generated by the json files, >> except the OOM Error remains. >> >> Other configurations: >> >>- set(“spark.storage.memoryFraction”, “0.1”) // no cache of RDD is >>used >>- set(“spark.serializer”, >>“org.apache.spark.serializer.KryoSerializer”) >> >> Any suggestions other than scale up/out the spark cluster? >> >> BR, >> Todd Leo >> >> > >
OutOfMemoryError When Reading Many json Files
Hey Spark Users, I kept getting java.lang.OutOfMemoryError: Java heap space as I read a massive amount of json files, iteratively via read.json(). Even the result RDD is rather small, I still get the OOM Error. The brief structure of my program reads as following, in psuedo-code: file_path_list.map{ jsonFile: String => sqlContext.read.json(jsonFile) .select($"some", $"fields") .withColumn("new_col", some_transformations($"col")) .rdd.map( x: Row => (k, v) ) .combineByKey() // which groups a column into item lists by another column as keys }.reduce( (i, j) => i.union(j) ) .combineByKey() // which combines results from all json files I confess some of the json files are Gigabytes huge, yet the combined RDD is in a few Megabytes. I’m not familiar with the under-the-hood mechanism, but my intuitive understanding of how the code executes is, read the file once a time (where I can easily modify map to foreach when fetching from file_path_list, if that’s the case), do the inner transformation on DF and combine, then reduce and do the outer combine immediately, which doesn’t require to hold all RDDs generated from all files in the memory. Obviously, as my code raises OOM Error, I must have missed something important. >From the debug log, I can tell the OOM Error happens when reading the same file, which is in a modest size of 2GB, while driver.memory is set to 13GB, and the available memory size before the code execution is around 8GB, on my standalone machine running as “local[8]”. To overcome this, I also tried to initialize an empty universal RDD variable, iteratively read one file at a time using foreach, then instead of reduce, simply combine each RDD generated by the json files, except the OOM Error remains. Other configurations: - set(“spark.storage.memoryFraction”, “0.1”) // no cache of RDD is used - set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) Any suggestions other than scale up/out the spark cluster? BR, Todd Leo