Well, right now it is quite parallelized as each line is processed
independent of each other, with a single reduce doing the final counting.
You can optimize the combination generation by several ways.

1. I may be missing something, but do you need to sort the list to aSorted?
That maybe costly for large lists.

2. Furthermore, dropping from the sorted sequence is probably costly. Just
walking through the list with indices is likely to me much more efficient.

for (xi <- 0 until length) {
       for (yi <- xi + 1 until length) {
   ...
} }

3. Creating and destroying hashmap's for every array of number, that is,
for every line in the file, is not very efficient. Using mapPartition is
better, as you can create one hashmap for the entire partition of data, and
process the whole partition of the file at one go.

textFile.mapPartition(lines => {
    val hashmap = ...
    lines.foreach(line => {
      val array = getArray(line)
      // add combinations from array
    })
    hashmap
})

Hope this helps.

TD


On Mon, Jan 20, 2014 at 4:53 PM, Brad Ruderman <[email protected]>wrote:

> Thanks Again Tathagata.
>
> I successfully launched an EC2 cluster with 5 nodes. I have split the
> tasks out and found a lot of time is being spent running the:
>
> def getCombinations(a: List[Int]) : HashMap[(Int,Int),Int] = {
>  val comboMap = new HashMap[(Int,Int),Int]()
> var aSorted = a.sorted
> for (x <- 0 to aSorted.length - 1){
>  var b = aSorted(0)
> aSorted = aSorted.drop(1)
> for (y <- 0 to aSorted.length - 1){
>  comboMap((b, aSorted(y))) = 1
> }
> }
> return comboMap
> }
>
> from the:
> val counts = textFile.map(line => getArray(line)).flatMap(a =>
> getCombinations(a)).reduceByKey(_ + _)
>
> Essentially I am trying to perform a "semi-cartesian" product, element
> array element contained within a larger array is joined against all other
> elements within that array. So if I have [[1,2,3],[2,3,5]] that would
> become:
> 1,2
> 1,3
> 2,3
> 2,3
> 2,5
> 3,5
>
> which would reduce down to
> 1,2 1
> 1,3 1
> 2,3 2
> 2,5
> 3,5
>
> I am wondering if there is a faster method that could be better
> parallelized then what I am doing.
>
> Thanks,
> Brad
>
> On Sun, Jan 19, 2014 at 2:25 PM, Tathagata Das <
> [email protected]> wrote:
>
>> 1) System properties must be set before creating the SparkContext. When
>> using the spark shell (which creates the context automatically), you have
>> to set it before starting the shell. See the 
>> configuration<http://spark.incubator.apache.org/docs/latest/configuration.html#environment-variables>page
>>  to see how to set java properties through environment variables.
>>
>> 2) Regarding correctness of computation, if it works locally it should
>> work on the cluster. Regarding parallelism, Spark runs only ask many task
>> simultaneously as there are number of cores that the SparkContext has asked
>> for. So on your local machine, it doesnt help much to increase the number
>> of tasks too much as it only has 4 cores (or 8 hyperthreaded one, maybe).
>> Having 10x more tasks with each task doing 1/10th the computation ends up
>> taking more or less the same amount of processing time. With more cores in
>> a cluster, it will parallelize further. The number of tasks should be
>> configured to be about 2-3x the number of cores in the cluster. See the
>> tuning<http://spark.incubator.apache.org/docs/latest/tuning.html#level-of-parallelism>page.
>>
>> TD
>>
>>
>> On Sun, Jan 19, 2014 at 1:30 PM, Brad Ruderman <[email protected]>wrote:
>>
>>> Hi Tathagata-
>>> Thanks for your response. I appreciate your help. A few follow-up
>>> questions:
>>>  1) I updated to master and tried to set the system properties. However
>>> when I view the properties in the web console, I don't see these listed.
>>> Also I see the that in activity monitor:  org.apache.spark.repl.Main is
>>> only taking 758mb.
>>>
>>> scala> System.setProperty("spark.executor.memory","4gb")
>>> res0: String = null
>>>
>>> scala> System.setProperty("spark.executor.memory","4gb")
>>> res1: String = 4gb
>>>
>>> Did something change in master that would prevent this from being set
>>> higher in standalone mode. My exact repo is:
>>>
>>> git clone [email protected]:apache/incubator-spark.git
>>> ./sbt/sbt assembly
>>> cd conf
>>> cp spark-env.sh.template spark-env.sh
>>> cp log4j.properties.template log4j.properties
>>> ./bin/spark-shell
>>>
>>> 2) My next steps are going to be to test this job in an AMZ EC2 cluster,
>>> but is there anything I can do locally to ensure that it is properly built
>>> so  that it can be heavily parallelized. I tried increasing the level of
>>> parallelism as well via passing a parameter in the reduceByKey value, but I
>>> didn't see any differences in performance. My local computer also doesn't
>>> seem to suffer as a result (which I think it would. I am running a MB Pro
>>> Retina i7 + 16gb Ram). I am worried that the using ListBuffer or HashMap
>>> will significantly slow things down and there are better ways to do this.
>>>
>>>
>>> Thanks,
>>> Brad
>>>
>>>
>>>
>>>
>>>
>>> On Sat, Jan 18, 2014 at 7:11 PM, Tathagata Das <
>>> [email protected]> wrote:
>>>
>>>> Hello Brad,
>>>>
>>>> The shuffle operation seems to be taking too much memory, more than
>>>> what your Java program can provide. I am not sure whether you have already
>>>> tried or not, but there are few basic things you can try.
>>>> 1. If you are running a local standalone Spark cluster, you can set the
>>>> amount of memory that the worker can use by setting spark.executor.memory.
>>>> See Spark 
>>>> configuration<http://spark.incubator.apache.org/docs/latest/configuration.html>
>>>> .
>>>> 2. You can increase the number of partitions created from the text
>>>> file, and increase the number of reducers. This lowers the memory usage of
>>>> each task. See Spark tuning 
>>>> guide<http://spark.incubator.apache.org/docs/latest/tuning.html#other-considerations>
>>>> .
>>>> 3. If you are still running out of memory, you can try our latest Spark
>>>> 0.9 <https://github.com/apache/incubator-spark/tree/branch-0.9>(or the
>>>> master branch). Shuffles have been modified to automatically spill to disk
>>>> when it needs more than available memory. 0.9 is undergoing community
>>>> voting and will be released very soon.
>>>>
>>>> TD
>>>>
>>>>
>>>> On Sat, Jan 18, 2014 at 4:58 PM, Brad Ruderman 
>>>> <[email protected]>wrote:
>>>>
>>>>> Hi All-
>>>>> I am very new to Spark (and Scala) but I was hoping to get some help
>>>>> creating a distributed job. Essentially I have a set of files (8k at 300mb
>>>>> each in HDFS) which was created in Hive (Hadoop) with the form:
>>>>> Int
>>>>> Array<Map<int,string>>
>>>>>
>>>>> I want to create a spark job that reads these files and creates a
>>>>> combination of the int array. Meaning it iterates over every int, ignoring
>>>>> the string, in the Array<Map<int,string>> and produces a [k,v] of
>>>>> (int_1,int_2), 1 Then it reduces by the k and sums the v to produce a 
>>>>> final
>>>>> result over a large dataset of :
>>>>> (int_1, int_2), count
>>>>>
>>>>> It is producing a set of all possible combinations for that given
>>>>> record. In order to do this I have locally downloaded a single file and am
>>>>> running Spark locally on my computer to try to process a a single file. 
>>>>> The
>>>>> file is 382715 lines (~400mb), however some of the arrays can be quite 
>>>>> big.
>>>>>
>>>>> My Spark Job looks like the following:
>>>>>
>>>>> import collection.mutable.HashMap
>>>>> import collection.mutable.ListBuffer
>>>>>
>>>>> def getCombinations(a: List[Int]) : HashMap[(Int,Int),Int] = {
>>>>>  val comboMap = new HashMap[(Int,Int),Int]()
>>>>> var aSorted = a.sorted
>>>>> for (x <- 0 to aSorted.length - 1){
>>>>>  var b = aSorted(0)
>>>>> aSorted = aSorted.drop(1)
>>>>> for (y <- 0 to aSorted.length - 1){
>>>>>  comboMap((b, aSorted(y))) = 1
>>>>> }
>>>>> }
>>>>> return comboMap
>>>>> }
>>>>>
>>>>> def getArray(line: String):List[Int] = {
>>>>> var a = line.split("\\x01")(1).split("\\x02")
>>>>> var ids = new ListBuffer[Int]
>>>>>  for (x <- 0 to a.length - 1){
>>>>> ids += Integer.parseInt(a(x).split("\\x03")(0))
>>>>> }
>>>>>  return ids.toList
>>>>> }
>>>>>
>>>>> val textFile = sc.textFile("slices.txt")
>>>>> val counts = textFile.map(line => getArray(line)).flatMap(a =>
>>>>> getCombinations(a)).reduceByKey(_ + _)
>>>>> counts.saveAsTextFile("spark_test")
>>>>>
>>>>>
>>>>> I am using getArray() to process the text file from its hive storage
>>>>> form, and then getCombinations() to create the k,v list then finally
>>>>> reducing over this list. Eventually I will move this to a distributed
>>>>> cluster but I was hoping to determine if there is anyway to parallelize or
>>>>> optimize this job for my local computer. Currently it fails with:
>>>>>
>>>>> java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: Java heap
>>>>> space)
>>>>>
>>>>> org.apache.spark.util.AppendOnlyMap.growTable(AppendOnlyMap.scala:200)
>>>>> org.apache.spark.util.AppendOnlyMap.incrementSize(AppendOnlyMap.scala:180)
>>>>> org.apache.spark.util.AppendOnlyMap.changeValue(AppendOnlyMap.scala:130)
>>>>> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:42)
>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
>>>>> org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
>>>>> org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
>>>>> Thanks,Brad
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to