Hello Brad, The shuffle operation seems to be taking too much memory, more than what your Java program can provide. I am not sure whether you have already tried or not, but there are few basic things you can try. 1. If you are running a local standalone Spark cluster, you can set the amount of memory that the worker can use by setting spark.executor.memory. See Spark configuration<http://spark.incubator.apache.org/docs/latest/configuration.html> . 2. You can increase the number of partitions created from the text file, and increase the number of reducers. This lowers the memory usage of each task. See Spark tuning guide<http://spark.incubator.apache.org/docs/latest/tuning.html#other-considerations> . 3. If you are still running out of memory, you can try our latest Spark 0.9 <https://github.com/apache/incubator-spark/tree/branch-0.9>(or the master branch). Shuffles have been modified to automatically spill to disk when it needs more than available memory. 0.9 is undergoing community voting and will be released very soon.
TD On Sat, Jan 18, 2014 at 4:58 PM, Brad Ruderman <[email protected]>wrote: > Hi All- > I am very new to Spark (and Scala) but I was hoping to get some help > creating a distributed job. Essentially I have a set of files (8k at 300mb > each in HDFS) which was created in Hive (Hadoop) with the form: > Int > Array<Map<int,string>> > > I want to create a spark job that reads these files and creates a > combination of the int array. Meaning it iterates over every int, ignoring > the string, in the Array<Map<int,string>> and produces a [k,v] of > (int_1,int_2), 1 Then it reduces by the k and sums the v to produce a final > result over a large dataset of : > (int_1, int_2), count > > It is producing a set of all possible combinations for that given record. > In order to do this I have locally downloaded a single file and am running > Spark locally on my computer to try to process a a single file. The file > is 382715 lines (~400mb), however some of the arrays can be quite big. > > My Spark Job looks like the following: > > import collection.mutable.HashMap > import collection.mutable.ListBuffer > > def getCombinations(a: List[Int]) : HashMap[(Int,Int),Int] = { > val comboMap = new HashMap[(Int,Int),Int]() > var aSorted = a.sorted > for (x <- 0 to aSorted.length - 1){ > var b = aSorted(0) > aSorted = aSorted.drop(1) > for (y <- 0 to aSorted.length - 1){ > comboMap((b, aSorted(y))) = 1 > } > } > return comboMap > } > > def getArray(line: String):List[Int] = { > var a = line.split("\\x01")(1).split("\\x02") > var ids = new ListBuffer[Int] > for (x <- 0 to a.length - 1){ > ids += Integer.parseInt(a(x).split("\\x03")(0)) > } > return ids.toList > } > > val textFile = sc.textFile("slices.txt") > val counts = textFile.map(line => getArray(line)).flatMap(a => > getCombinations(a)).reduceByKey(_ + _) > counts.saveAsTextFile("spark_test") > > > I am using getArray() to process the text file from its hive storage form, > and then getCombinations() to create the k,v list then finally reducing > over this list. Eventually I will move this to a distributed cluster but I > was hoping to determine if there is anyway to parallelize or optimize this > job for my local computer. Currently it fails with: > > java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: Java heap space) > > org.apache.spark.util.AppendOnlyMap.growTable(AppendOnlyMap.scala:200) > org.apache.spark.util.AppendOnlyMap.incrementSize(AppendOnlyMap.scala:180) > org.apache.spark.util.AppendOnlyMap.changeValue(AppendOnlyMap.scala:130) > org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:42) > org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91) > org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91) > org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444) > org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444) > Thanks,Brad >
