Thanks Again Tathagata.

I successfully launched an EC2 cluster with 5 nodes. I have split the tasks
out and found a lot of time is being spent running the:

def getCombinations(a: List[Int]) : HashMap[(Int,Int),Int] = {
 val comboMap = new HashMap[(Int,Int),Int]()
var aSorted = a.sorted
for (x <- 0 to aSorted.length - 1){
 var b = aSorted(0)
aSorted = aSorted.drop(1)
for (y <- 0 to aSorted.length - 1){
 comboMap((b, aSorted(y))) = 1
}
}
return comboMap
}

from the:
val counts = textFile.map(line => getArray(line)).flatMap(a =>
getCombinations(a)).reduceByKey(_ + _)

Essentially I am trying to perform a "semi-cartesian" product, element
array element contained within a larger array is joined against all other
elements within that array. So if I have [[1,2,3],[2,3,5]] that would
become:
1,2
1,3
2,3
2,3
2,5
3,5

which would reduce down to
1,2 1
1,3 1
2,3 2
2,5
3,5

I am wondering if there is a faster method that could be better
parallelized then what I am doing.

Thanks,
Brad

On Sun, Jan 19, 2014 at 2:25 PM, Tathagata Das
<[email protected]>wrote:

> 1) System properties must be set before creating the SparkContext. When
> using the spark shell (which creates the context automatically), you have
> to set it before starting the shell. See the 
> configuration<http://spark.incubator.apache.org/docs/latest/configuration.html#environment-variables>page
>  to see how to set java properties through environment variables.
>
> 2) Regarding correctness of computation, if it works locally it should
> work on the cluster. Regarding parallelism, Spark runs only ask many task
> simultaneously as there are number of cores that the SparkContext has asked
> for. So on your local machine, it doesnt help much to increase the number
> of tasks too much as it only has 4 cores (or 8 hyperthreaded one, maybe).
> Having 10x more tasks with each task doing 1/10th the computation ends up
> taking more or less the same amount of processing time. With more cores in
> a cluster, it will parallelize further. The number of tasks should be
> configured to be about 2-3x the number of cores in the cluster. See the
> tuning<http://spark.incubator.apache.org/docs/latest/tuning.html#level-of-parallelism>page.
>
> TD
>
>
> On Sun, Jan 19, 2014 at 1:30 PM, Brad Ruderman <[email protected]>wrote:
>
>> Hi Tathagata-
>> Thanks for your response. I appreciate your help. A few follow-up
>> questions:
>>  1) I updated to master and tried to set the system properties. However
>> when I view the properties in the web console, I don't see these listed.
>> Also I see the that in activity monitor:  org.apache.spark.repl.Main is
>> only taking 758mb.
>>
>> scala> System.setProperty("spark.executor.memory","4gb")
>> res0: String = null
>>
>> scala> System.setProperty("spark.executor.memory","4gb")
>> res1: String = 4gb
>>
>> Did something change in master that would prevent this from being set
>> higher in standalone mode. My exact repo is:
>>
>> git clone [email protected]:apache/incubator-spark.git
>> ./sbt/sbt assembly
>> cd conf
>> cp spark-env.sh.template spark-env.sh
>> cp log4j.properties.template log4j.properties
>> ./bin/spark-shell
>>
>> 2) My next steps are going to be to test this job in an AMZ EC2 cluster,
>> but is there anything I can do locally to ensure that it is properly built
>> so  that it can be heavily parallelized. I tried increasing the level of
>> parallelism as well via passing a parameter in the reduceByKey value, but I
>> didn't see any differences in performance. My local computer also doesn't
>> seem to suffer as a result (which I think it would. I am running a MB Pro
>> Retina i7 + 16gb Ram). I am worried that the using ListBuffer or HashMap
>> will significantly slow things down and there are better ways to do this.
>>
>>
>> Thanks,
>> Brad
>>
>>
>>
>>
>>
>> On Sat, Jan 18, 2014 at 7:11 PM, Tathagata Das <
>> [email protected]> wrote:
>>
>>> Hello Brad,
>>>
>>> The shuffle operation seems to be taking too much memory, more than what
>>> your Java program can provide. I am not sure whether you have already tried
>>> or not, but there are few basic things you can try.
>>> 1. If you are running a local standalone Spark cluster, you can set the
>>> amount of memory that the worker can use by setting spark.executor.memory.
>>> See Spark 
>>> configuration<http://spark.incubator.apache.org/docs/latest/configuration.html>
>>> .
>>> 2. You can increase the number of partitions created from the text file,
>>> and increase the number of reducers. This lowers the memory usage of each
>>> task. See Spark tuning 
>>> guide<http://spark.incubator.apache.org/docs/latest/tuning.html#other-considerations>
>>> .
>>> 3. If you are still running out of memory, you can try our latest Spark
>>> 0.9 <https://github.com/apache/incubator-spark/tree/branch-0.9>(or the
>>> master branch). Shuffles have been modified to automatically spill to disk
>>> when it needs more than available memory. 0.9 is undergoing community
>>> voting and will be released very soon.
>>>
>>> TD
>>>
>>>
>>> On Sat, Jan 18, 2014 at 4:58 PM, Brad Ruderman 
>>> <[email protected]>wrote:
>>>
>>>> Hi All-
>>>> I am very new to Spark (and Scala) but I was hoping to get some help
>>>> creating a distributed job. Essentially I have a set of files (8k at 300mb
>>>> each in HDFS) which was created in Hive (Hadoop) with the form:
>>>> Int
>>>> Array<Map<int,string>>
>>>>
>>>> I want to create a spark job that reads these files and creates a
>>>> combination of the int array. Meaning it iterates over every int, ignoring
>>>> the string, in the Array<Map<int,string>> and produces a [k,v] of
>>>> (int_1,int_2), 1 Then it reduces by the k and sums the v to produce a final
>>>> result over a large dataset of :
>>>> (int_1, int_2), count
>>>>
>>>> It is producing a set of all possible combinations for that given
>>>> record. In order to do this I have locally downloaded a single file and am
>>>> running Spark locally on my computer to try to process a a single file. The
>>>> file is 382715 lines (~400mb), however some of the arrays can be quite big.
>>>>
>>>> My Spark Job looks like the following:
>>>>
>>>> import collection.mutable.HashMap
>>>> import collection.mutable.ListBuffer
>>>>
>>>> def getCombinations(a: List[Int]) : HashMap[(Int,Int),Int] = {
>>>>  val comboMap = new HashMap[(Int,Int),Int]()
>>>> var aSorted = a.sorted
>>>> for (x <- 0 to aSorted.length - 1){
>>>>  var b = aSorted(0)
>>>> aSorted = aSorted.drop(1)
>>>> for (y <- 0 to aSorted.length - 1){
>>>>  comboMap((b, aSorted(y))) = 1
>>>> }
>>>> }
>>>> return comboMap
>>>> }
>>>>
>>>> def getArray(line: String):List[Int] = {
>>>> var a = line.split("\\x01")(1).split("\\x02")
>>>> var ids = new ListBuffer[Int]
>>>>  for (x <- 0 to a.length - 1){
>>>> ids += Integer.parseInt(a(x).split("\\x03")(0))
>>>> }
>>>>  return ids.toList
>>>> }
>>>>
>>>> val textFile = sc.textFile("slices.txt")
>>>> val counts = textFile.map(line => getArray(line)).flatMap(a =>
>>>> getCombinations(a)).reduceByKey(_ + _)
>>>> counts.saveAsTextFile("spark_test")
>>>>
>>>>
>>>> I am using getArray() to process the text file from its hive storage
>>>> form, and then getCombinations() to create the k,v list then finally
>>>> reducing over this list. Eventually I will move this to a distributed
>>>> cluster but I was hoping to determine if there is anyway to parallelize or
>>>> optimize this job for my local computer. Currently it fails with:
>>>>
>>>> java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: Java heap space)
>>>>
>>>> org.apache.spark.util.AppendOnlyMap.growTable(AppendOnlyMap.scala:200)
>>>> org.apache.spark.util.AppendOnlyMap.incrementSize(AppendOnlyMap.scala:180)
>>>> org.apache.spark.util.AppendOnlyMap.changeValue(AppendOnlyMap.scala:130)
>>>> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:42)
>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
>>>> org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
>>>> org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
>>>> Thanks,Brad
>>>>
>>>
>>>
>>
>

Reply via email to