Hi Tathagata-
Thanks for your response. I appreciate your help. A few follow-up questions:
1) I updated to master and tried to set the system properties. However when
I view the properties in the web console, I don't see these listed. Also I
see the that in activity monitor:  org.apache.spark.repl.Main is only
taking 758mb.

scala> System.setProperty("spark.executor.memory","4gb")
res0: String = null

scala> System.setProperty("spark.executor.memory","4gb")
res1: String = 4gb

Did something change in master that would prevent this from being set
higher in standalone mode. My exact repo is:

git clone [email protected]:apache/incubator-spark.git
./sbt/sbt assembly
cd conf
cp spark-env.sh.template spark-env.sh
cp log4j.properties.template log4j.properties
./bin/spark-shell

2) My next steps are going to be to test this job in an AMZ EC2 cluster,
but is there anything I can do locally to ensure that it is properly built
so  that it can be heavily parallelized. I tried increasing the level of
parallelism as well via passing a parameter in the reduceByKey value, but I
didn't see any differences in performance. My local computer also doesn't
seem to suffer as a result (which I think it would. I am running a MB Pro
Retina i7 + 16gb Ram). I am worried that the using ListBuffer or HashMap
will significantly slow things down and there are better ways to do this.


Thanks,
Brad





On Sat, Jan 18, 2014 at 7:11 PM, Tathagata Das
<[email protected]>wrote:

> Hello Brad,
>
> The shuffle operation seems to be taking too much memory, more than what
> your Java program can provide. I am not sure whether you have already tried
> or not, but there are few basic things you can try.
> 1. If you are running a local standalone Spark cluster, you can set the
> amount of memory that the worker can use by setting spark.executor.memory.
> See Spark 
> configuration<http://spark.incubator.apache.org/docs/latest/configuration.html>
> .
> 2. You can increase the number of partitions created from the text file,
> and increase the number of reducers. This lowers the memory usage of each
> task. See Spark tuning 
> guide<http://spark.incubator.apache.org/docs/latest/tuning.html#other-considerations>
> .
> 3. If you are still running out of memory, you can try our latest Spark
> 0.9 <https://github.com/apache/incubator-spark/tree/branch-0.9>(or the
> master branch). Shuffles have been modified to automatically spill to disk
> when it needs more than available memory. 0.9 is undergoing community
> voting and will be released very soon.
>
> TD
>
>
> On Sat, Jan 18, 2014 at 4:58 PM, Brad Ruderman <[email protected]>wrote:
>
>> Hi All-
>> I am very new to Spark (and Scala) but I was hoping to get some help
>> creating a distributed job. Essentially I have a set of files (8k at 300mb
>> each in HDFS) which was created in Hive (Hadoop) with the form:
>> Int
>> Array<Map<int,string>>
>>
>> I want to create a spark job that reads these files and creates a
>> combination of the int array. Meaning it iterates over every int, ignoring
>> the string, in the Array<Map<int,string>> and produces a [k,v] of
>> (int_1,int_2), 1 Then it reduces by the k and sums the v to produce a final
>> result over a large dataset of :
>> (int_1, int_2), count
>>
>> It is producing a set of all possible combinations for that given record.
>> In order to do this I have locally downloaded a single file and am running
>> Spark locally on my computer to try to process a a single file. The file
>> is 382715 lines (~400mb), however some of the arrays can be quite big.
>>
>> My Spark Job looks like the following:
>>
>> import collection.mutable.HashMap
>> import collection.mutable.ListBuffer
>>
>> def getCombinations(a: List[Int]) : HashMap[(Int,Int),Int] = {
>>  val comboMap = new HashMap[(Int,Int),Int]()
>> var aSorted = a.sorted
>> for (x <- 0 to aSorted.length - 1){
>>  var b = aSorted(0)
>> aSorted = aSorted.drop(1)
>> for (y <- 0 to aSorted.length - 1){
>>  comboMap((b, aSorted(y))) = 1
>> }
>> }
>> return comboMap
>> }
>>
>> def getArray(line: String):List[Int] = {
>> var a = line.split("\\x01")(1).split("\\x02")
>> var ids = new ListBuffer[Int]
>>  for (x <- 0 to a.length - 1){
>> ids += Integer.parseInt(a(x).split("\\x03")(0))
>> }
>>  return ids.toList
>> }
>>
>> val textFile = sc.textFile("slices.txt")
>> val counts = textFile.map(line => getArray(line)).flatMap(a =>
>> getCombinations(a)).reduceByKey(_ + _)
>> counts.saveAsTextFile("spark_test")
>>
>>
>> I am using getArray() to process the text file from its hive storage
>> form, and then getCombinations() to create the k,v list then finally
>> reducing over this list. Eventually I will move this to a distributed
>> cluster but I was hoping to determine if there is anyway to parallelize or
>> optimize this job for my local computer. Currently it fails with:
>>
>> java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: Java heap space)
>>
>> org.apache.spark.util.AppendOnlyMap.growTable(AppendOnlyMap.scala:200)
>> org.apache.spark.util.AppendOnlyMap.incrementSize(AppendOnlyMap.scala:180)
>> org.apache.spark.util.AppendOnlyMap.changeValue(AppendOnlyMap.scala:130)
>> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:42)
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
>> org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
>> org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
>> Thanks,Brad
>>
>
>

Reply via email to