1) System properties must be set before creating the SparkContext. When
using the spark shell (which creates the context automatically), you have
to set it before starting the shell. See the
configuration<http://spark.incubator.apache.org/docs/latest/configuration.html#environment-variables>page
to see how to set java properties through environment variables.

2) Regarding correctness of computation, if it works locally it should work
on the cluster. Regarding parallelism, Spark runs only ask many task
simultaneously as there are number of cores that the SparkContext has asked
for. So on your local machine, it doesnt help much to increase the number
of tasks too much as it only has 4 cores (or 8 hyperthreaded one, maybe).
Having 10x more tasks with each task doing 1/10th the computation ends up
taking more or less the same amount of processing time. With more cores in
a cluster, it will parallelize further. The number of tasks should be
configured to be about 2-3x the number of cores in the cluster. See the
tuning<http://spark.incubator.apache.org/docs/latest/tuning.html#level-of-parallelism>page.

TD


On Sun, Jan 19, 2014 at 1:30 PM, Brad Ruderman <[email protected]>wrote:

> Hi Tathagata-
> Thanks for your response. I appreciate your help. A few follow-up
> questions:
> 1) I updated to master and tried to set the system properties. However
> when I view the properties in the web console, I don't see these listed.
> Also I see the that in activity monitor:  org.apache.spark.repl.Main is
> only taking 758mb.
>
> scala> System.setProperty("spark.executor.memory","4gb")
> res0: String = null
>
> scala> System.setProperty("spark.executor.memory","4gb")
> res1: String = 4gb
>
> Did something change in master that would prevent this from being set
> higher in standalone mode. My exact repo is:
>
> git clone [email protected]:apache/incubator-spark.git
> ./sbt/sbt assembly
> cd conf
> cp spark-env.sh.template spark-env.sh
> cp log4j.properties.template log4j.properties
> ./bin/spark-shell
>
> 2) My next steps are going to be to test this job in an AMZ EC2 cluster,
> but is there anything I can do locally to ensure that it is properly built
> so  that it can be heavily parallelized. I tried increasing the level of
> parallelism as well via passing a parameter in the reduceByKey value, but I
> didn't see any differences in performance. My local computer also doesn't
> seem to suffer as a result (which I think it would. I am running a MB Pro
> Retina i7 + 16gb Ram). I am worried that the using ListBuffer or HashMap
> will significantly slow things down and there are better ways to do this.
>
>
> Thanks,
> Brad
>
>
>
>
>
> On Sat, Jan 18, 2014 at 7:11 PM, Tathagata Das <
> [email protected]> wrote:
>
>> Hello Brad,
>>
>> The shuffle operation seems to be taking too much memory, more than what
>> your Java program can provide. I am not sure whether you have already tried
>> or not, but there are few basic things you can try.
>> 1. If you are running a local standalone Spark cluster, you can set the
>> amount of memory that the worker can use by setting spark.executor.memory.
>> See Spark 
>> configuration<http://spark.incubator.apache.org/docs/latest/configuration.html>
>> .
>> 2. You can increase the number of partitions created from the text file,
>> and increase the number of reducers. This lowers the memory usage of each
>> task. See Spark tuning 
>> guide<http://spark.incubator.apache.org/docs/latest/tuning.html#other-considerations>
>> .
>> 3. If you are still running out of memory, you can try our latest Spark
>> 0.9 <https://github.com/apache/incubator-spark/tree/branch-0.9>(or the
>> master branch). Shuffles have been modified to automatically spill to disk
>> when it needs more than available memory. 0.9 is undergoing community
>> voting and will be released very soon.
>>
>> TD
>>
>>
>> On Sat, Jan 18, 2014 at 4:58 PM, Brad Ruderman <[email protected]>wrote:
>>
>>> Hi All-
>>> I am very new to Spark (and Scala) but I was hoping to get some help
>>> creating a distributed job. Essentially I have a set of files (8k at 300mb
>>> each in HDFS) which was created in Hive (Hadoop) with the form:
>>> Int
>>> Array<Map<int,string>>
>>>
>>> I want to create a spark job that reads these files and creates a
>>> combination of the int array. Meaning it iterates over every int, ignoring
>>> the string, in the Array<Map<int,string>> and produces a [k,v] of
>>> (int_1,int_2), 1 Then it reduces by the k and sums the v to produce a final
>>> result over a large dataset of :
>>> (int_1, int_2), count
>>>
>>> It is producing a set of all possible combinations for that given
>>> record. In order to do this I have locally downloaded a single file and am
>>> running Spark locally on my computer to try to process a a single file. The
>>> file is 382715 lines (~400mb), however some of the arrays can be quite big.
>>>
>>> My Spark Job looks like the following:
>>>
>>> import collection.mutable.HashMap
>>> import collection.mutable.ListBuffer
>>>
>>> def getCombinations(a: List[Int]) : HashMap[(Int,Int),Int] = {
>>>  val comboMap = new HashMap[(Int,Int),Int]()
>>> var aSorted = a.sorted
>>> for (x <- 0 to aSorted.length - 1){
>>>  var b = aSorted(0)
>>> aSorted = aSorted.drop(1)
>>> for (y <- 0 to aSorted.length - 1){
>>>  comboMap((b, aSorted(y))) = 1
>>> }
>>> }
>>> return comboMap
>>> }
>>>
>>> def getArray(line: String):List[Int] = {
>>> var a = line.split("\\x01")(1).split("\\x02")
>>> var ids = new ListBuffer[Int]
>>>  for (x <- 0 to a.length - 1){
>>> ids += Integer.parseInt(a(x).split("\\x03")(0))
>>> }
>>>  return ids.toList
>>> }
>>>
>>> val textFile = sc.textFile("slices.txt")
>>> val counts = textFile.map(line => getArray(line)).flatMap(a =>
>>> getCombinations(a)).reduceByKey(_ + _)
>>> counts.saveAsTextFile("spark_test")
>>>
>>>
>>> I am using getArray() to process the text file from its hive storage
>>> form, and then getCombinations() to create the k,v list then finally
>>> reducing over this list. Eventually I will move this to a distributed
>>> cluster but I was hoping to determine if there is anyway to parallelize or
>>> optimize this job for my local computer. Currently it fails with:
>>>
>>> java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: Java heap space)
>>>
>>> org.apache.spark.util.AppendOnlyMap.growTable(AppendOnlyMap.scala:200)
>>> org.apache.spark.util.AppendOnlyMap.incrementSize(AppendOnlyMap.scala:180)
>>> org.apache.spark.util.AppendOnlyMap.changeValue(AppendOnlyMap.scala:130)
>>> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:42)
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
>>> org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
>>> org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
>>> Thanks,Brad
>>>
>>
>>
>

Reply via email to