Hi Tathagata-
Thanks for your help. I appreciate your suggestions. I am having trouble
following the code for the mapPartitions
According to the documentation I need to pass an a function of type
iter[t], thus I receive an error on the foreach.
var textFile = sc.textFile("input.txt")
textFile.mapPartitions(lines => {
lines.foreach( line => {
println(line)
})
})
<console>:18: error: type mismatch;
found : Unit
required: Iterator[?]
Error occurred in an application involving default arguments.
lines.foreach( line => {
Therefore I think I need to run a map, but then not sure if I should reduce
within the mapPartitions lambda or outside.
val counts = textFile.mapPartitions(lines => {
val hashmap = new HashMap[(Int,Int),Int]()
lines.map( line => {
val array = getArray(line)
for (x <- 0 to array.length - 1){
for (y <- 0 to array.length - 1){
hashmap((array(x), array(y))) = 1
}
}
})
})
In this case the hashmap is not accessible outside the Partitions, since
when I save to file it is empty. Could you please clarify how to use
mapPartitions()?
Thanks,
Brad
On Mon, Jan 20, 2014 at 6:41 PM, Tathagata Das
<[email protected]>wrote:
> Well, right now it is quite parallelized as each line is processed
> independent of each other, with a single reduce doing the final counting.
> You can optimize the combination generation by several ways.
>
> 1. I may be missing something, but do you need to sort the list to
> aSorted? That maybe costly for large lists.
>
> 2. Furthermore, dropping from the sorted sequence is probably costly. Just
> walking through the list with indices is likely to me much more efficient.
>
> for (xi <- 0 until length) {
> for (yi <- xi + 1 until length) {
> ...
> } }
>
> 3. Creating and destroying hashmap's for every array of number, that is,
> for every line in the file, is not very efficient. Using mapPartition is
> better, as you can create one hashmap for the entire partition of data, and
> process the whole partition of the file at one go.
>
> textFile.mapPartition(lines => {
> val hashmap = ...
> lines.foreach(line => {
> val array = getArray(line)
> // add combinations from array
> })
> hashmap
> })
>
> Hope this helps.
>
> TD
>
>
> On Mon, Jan 20, 2014 at 4:53 PM, Brad Ruderman <[email protected]>wrote:
>
>> Thanks Again Tathagata.
>>
>> I successfully launched an EC2 cluster with 5 nodes. I have split the
>> tasks out and found a lot of time is being spent running the:
>>
>> def getCombinations(a: List[Int]) : HashMap[(Int,Int),Int] = {
>> val comboMap = new HashMap[(Int,Int),Int]()
>> var aSorted = a.sorted
>> for (x <- 0 to aSorted.length - 1){
>> var b = aSorted(0)
>> aSorted = aSorted.drop(1)
>> for (y <- 0 to aSorted.length - 1){
>> comboMap((b, aSorted(y))) = 1
>> }
>> }
>> return comboMap
>> }
>>
>> from the:
>> val counts = textFile.map(line => getArray(line)).flatMap(a =>
>> getCombinations(a)).reduceByKey(_ + _)
>>
>> Essentially I am trying to perform a "semi-cartesian" product, element
>> array element contained within a larger array is joined against all other
>> elements within that array. So if I have [[1,2,3],[2,3,5]] that would
>> become:
>> 1,2
>> 1,3
>> 2,3
>> 2,3
>> 2,5
>> 3,5
>>
>> which would reduce down to
>> 1,2 1
>> 1,3 1
>> 2,3 2
>> 2,5
>> 3,5
>>
>> I am wondering if there is a faster method that could be better
>> parallelized then what I am doing.
>>
>> Thanks,
>> Brad
>>
>> On Sun, Jan 19, 2014 at 2:25 PM, Tathagata Das <
>> [email protected]> wrote:
>>
>>> 1) System properties must be set before creating the SparkContext. When
>>> using the spark shell (which creates the context automatically), you have
>>> to set it before starting the shell. See the
>>> configuration<http://spark.incubator.apache.org/docs/latest/configuration.html#environment-variables>page
>>> to see how to set java properties through environment variables.
>>>
>>> 2) Regarding correctness of computation, if it works locally it should
>>> work on the cluster. Regarding parallelism, Spark runs only ask many task
>>> simultaneously as there are number of cores that the SparkContext has asked
>>> for. So on your local machine, it doesnt help much to increase the number
>>> of tasks too much as it only has 4 cores (or 8 hyperthreaded one, maybe).
>>> Having 10x more tasks with each task doing 1/10th the computation ends up
>>> taking more or less the same amount of processing time. With more cores in
>>> a cluster, it will parallelize further. The number of tasks should be
>>> configured to be about 2-3x the number of cores in the cluster. See the
>>> tuning<http://spark.incubator.apache.org/docs/latest/tuning.html#level-of-parallelism>page.
>>>
>>> TD
>>>
>>>
>>> On Sun, Jan 19, 2014 at 1:30 PM, Brad Ruderman
>>> <[email protected]>wrote:
>>>
>>>> Hi Tathagata-
>>>> Thanks for your response. I appreciate your help. A few follow-up
>>>> questions:
>>>> 1) I updated to master and tried to set the system properties. However
>>>> when I view the properties in the web console, I don't see these listed.
>>>> Also I see the that in activity monitor: org.apache.spark.repl.Main is
>>>> only taking 758mb.
>>>>
>>>> scala> System.setProperty("spark.executor.memory","4gb")
>>>> res0: String = null
>>>>
>>>> scala> System.setProperty("spark.executor.memory","4gb")
>>>> res1: String = 4gb
>>>>
>>>> Did something change in master that would prevent this from being set
>>>> higher in standalone mode. My exact repo is:
>>>>
>>>> git clone [email protected]:apache/incubator-spark.git
>>>> ./sbt/sbt assembly
>>>> cd conf
>>>> cp spark-env.sh.template spark-env.sh
>>>> cp log4j.properties.template log4j.properties
>>>> ./bin/spark-shell
>>>>
>>>> 2) My next steps are going to be to test this job in an AMZ EC2
>>>> cluster, but is there anything I can do locally to ensure that it is
>>>> properly built so that it can be heavily parallelized. I tried increasing
>>>> the level of parallelism as well via passing a parameter in the reduceByKey
>>>> value, but I didn't see any differences in performance. My local computer
>>>> also doesn't seem to suffer as a result (which I think it would. I am
>>>> running a MB Pro Retina i7 + 16gb Ram). I am worried that the using
>>>> ListBuffer or HashMap will significantly slow things down and there are
>>>> better ways to do this.
>>>>
>>>>
>>>> Thanks,
>>>> Brad
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, Jan 18, 2014 at 7:11 PM, Tathagata Das <
>>>> [email protected]> wrote:
>>>>
>>>>> Hello Brad,
>>>>>
>>>>> The shuffle operation seems to be taking too much memory, more than
>>>>> what your Java program can provide. I am not sure whether you have already
>>>>> tried or not, but there are few basic things you can try.
>>>>> 1. If you are running a local standalone Spark cluster, you can set
>>>>> the amount of memory that the worker can use by setting
>>>>> spark.executor.memory.
>>>>> See Spark
>>>>> configuration<http://spark.incubator.apache.org/docs/latest/configuration.html>
>>>>> .
>>>>> 2. You can increase the number of partitions created from the text
>>>>> file, and increase the number of reducers. This lowers the memory usage of
>>>>> each task. See Spark tuning
>>>>> guide<http://spark.incubator.apache.org/docs/latest/tuning.html#other-considerations>
>>>>> .
>>>>> 3. If you are still running out of memory, you can try our latest Spark
>>>>> 0.9 <https://github.com/apache/incubator-spark/tree/branch-0.9>(or
>>>>> the master branch). Shuffles have been modified to automatically spill to
>>>>> disk when it needs more than available memory. 0.9 is undergoing community
>>>>> voting and will be released very soon.
>>>>>
>>>>> TD
>>>>>
>>>>>
>>>>> On Sat, Jan 18, 2014 at 4:58 PM, Brad Ruderman <[email protected]
>>>>> > wrote:
>>>>>
>>>>>> Hi All-
>>>>>> I am very new to Spark (and Scala) but I was hoping to get some help
>>>>>> creating a distributed job. Essentially I have a set of files (8k at
>>>>>> 300mb
>>>>>> each in HDFS) which was created in Hive (Hadoop) with the form:
>>>>>> Int
>>>>>> Array<Map<int,string>>
>>>>>>
>>>>>> I want to create a spark job that reads these files and creates a
>>>>>> combination of the int array. Meaning it iterates over every int,
>>>>>> ignoring
>>>>>> the string, in the Array<Map<int,string>> and produces a [k,v] of
>>>>>> (int_1,int_2), 1 Then it reduces by the k and sums the v to produce a
>>>>>> final
>>>>>> result over a large dataset of :
>>>>>> (int_1, int_2), count
>>>>>>
>>>>>> It is producing a set of all possible combinations for that given
>>>>>> record. In order to do this I have locally downloaded a single file and
>>>>>> am
>>>>>> running Spark locally on my computer to try to process a a single file.
>>>>>> The
>>>>>> file is 382715 lines (~400mb), however some of the arrays can be quite
>>>>>> big.
>>>>>>
>>>>>> My Spark Job looks like the following:
>>>>>>
>>>>>> import collection.mutable.HashMap
>>>>>> import collection.mutable.ListBuffer
>>>>>>
>>>>>> def getCombinations(a: List[Int]) : HashMap[(Int,Int),Int] = {
>>>>>> val comboMap = new HashMap[(Int,Int),Int]()
>>>>>> var aSorted = a.sorted
>>>>>> for (x <- 0 to aSorted.length - 1){
>>>>>> var b = aSorted(0)
>>>>>> aSorted = aSorted.drop(1)
>>>>>> for (y <- 0 to aSorted.length - 1){
>>>>>> comboMap((b, aSorted(y))) = 1
>>>>>> }
>>>>>> }
>>>>>> return comboMap
>>>>>> }
>>>>>>
>>>>>> def getArray(line: String):List[Int] = {
>>>>>> var a = line.split("\\x01")(1).split("\\x02")
>>>>>> var ids = new ListBuffer[Int]
>>>>>> for (x <- 0 to a.length - 1){
>>>>>> ids += Integer.parseInt(a(x).split("\\x03")(0))
>>>>>> }
>>>>>> return ids.toList
>>>>>> }
>>>>>>
>>>>>> val textFile = sc.textFile("slices.txt")
>>>>>> val counts = textFile.map(line => getArray(line)).flatMap(a =>
>>>>>> getCombinations(a)).reduceByKey(_ + _)
>>>>>> counts.saveAsTextFile("spark_test")
>>>>>>
>>>>>>
>>>>>> I am using getArray() to process the text file from its hive storage
>>>>>> form, and then getCombinations() to create the k,v list then finally
>>>>>> reducing over this list. Eventually I will move this to a distributed
>>>>>> cluster but I was hoping to determine if there is anyway to parallelize
>>>>>> or
>>>>>> optimize this job for my local computer. Currently it fails with:
>>>>>>
>>>>>> java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: Java heap
>>>>>> space)
>>>>>>
>>>>>> org.apache.spark.util.AppendOnlyMap.growTable(AppendOnlyMap.scala:200)
>>>>>> org.apache.spark.util.AppendOnlyMap.incrementSize(AppendOnlyMap.scala:180)
>>>>>> org.apache.spark.util.AppendOnlyMap.changeValue(AppendOnlyMap.scala:130)
>>>>>> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:42)
>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
>>>>>> org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
>>>>>> org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
>>>>>> Thanks,Brad
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>