Hello Brad,

The shuffle operation seems to be taking too much memory, more than what
your Java program can provide. I am not sure whether you have already tried
or not, but there are few basic things you can try.
1. If you are running a local standalone Spark cluster, you can set the
amount of memory that the worker can use by setting spark.executor.memory.
See Spark 
configuration<http://spark.incubator.apache.org/docs/latest/configuration.html>
.
2. You can increase the number of partitions created from the text file,
and increase the number of reducers. This lowers the memory usage of each
task. See Spark tuning
guide<http://spark.incubator.apache.org/docs/latest/tuning.html#other-considerations>
.
3. If you are still running out of memory, you can try our latest Spark 0.9
<https://github.com/apache/incubator-spark/tree/branch-0.9>(or the master
branch). Shuffles have been modified to automatically spill to disk when it
needs more than available memory. 0.9 is undergoing community voting and
will be released very soon.

TD


On Sat, Jan 18, 2014 at 4:58 PM, Brad Ruderman <[email protected]>wrote:

> Hi All-
> I am very new to Spark (and Scala) but I was hoping to get some help
> creating a distributed job. Essentially I have a set of files (8k at 300mb
> each in HDFS) which was created in Hive (Hadoop) with the form:
> Int
> Array<Map<int,string>>
>
> I want to create a spark job that reads these files and creates a
> combination of the int array. Meaning it iterates over every int, ignoring
> the string, in the Array<Map<int,string>> and produces a [k,v] of
> (int_1,int_2), 1 Then it reduces by the k and sums the v to produce a final
> result over a large dataset of :
> (int_1, int_2), count
>
> It is producing a set of all possible combinations for that given record.
> In order to do this I have locally downloaded a single file and am running
> Spark locally on my computer to try to process a a single file. The file
> is 382715 lines (~400mb), however some of the arrays can be quite big.
>
> My Spark Job looks like the following:
>
> import collection.mutable.HashMap
> import collection.mutable.ListBuffer
>
> def getCombinations(a: List[Int]) : HashMap[(Int,Int),Int] = {
>  val comboMap = new HashMap[(Int,Int),Int]()
> var aSorted = a.sorted
> for (x <- 0 to aSorted.length - 1){
>  var b = aSorted(0)
> aSorted = aSorted.drop(1)
> for (y <- 0 to aSorted.length - 1){
>  comboMap((b, aSorted(y))) = 1
> }
> }
> return comboMap
> }
>
> def getArray(line: String):List[Int] = {
> var a = line.split("\\x01")(1).split("\\x02")
> var ids = new ListBuffer[Int]
>  for (x <- 0 to a.length - 1){
> ids += Integer.parseInt(a(x).split("\\x03")(0))
> }
>  return ids.toList
> }
>
> val textFile = sc.textFile("slices.txt")
> val counts = textFile.map(line => getArray(line)).flatMap(a =>
> getCombinations(a)).reduceByKey(_ + _)
> counts.saveAsTextFile("spark_test")
>
>
> I am using getArray() to process the text file from its hive storage form,
> and then getCombinations() to create the k,v list then finally reducing
> over this list. Eventually I will move this to a distributed cluster but I
> was hoping to determine if there is anyway to parallelize or optimize this
> job for my local computer. Currently it fails with:
>
> java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: Java heap space)
>
> org.apache.spark.util.AppendOnlyMap.growTable(AppendOnlyMap.scala:200)
> org.apache.spark.util.AppendOnlyMap.incrementSize(AppendOnlyMap.scala:180)
> org.apache.spark.util.AppendOnlyMap.changeValue(AppendOnlyMap.scala:130)
> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:42)
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
> org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
> org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
> Thanks,Brad
>

Reply via email to