Since you want to count the frequency of each pair, you basically want to
return the sequence of (integer-pair, count-in-partition) from the
mapPartition, so that the output of the mapPartition is an RDD[(Int, Int),
Int)]. Just returning the hashmap.toIterator should be fine.
val counts = textFile.mapPartitions(lines => {
val hashmap = new HashMap[(Int,Int),Int]()
lines.foreach( line => {
val array = getArray(line)
for((x,i) <- array.view.zipWithIndex){
for (j <- (i+1) to array.length - 1){
hashmap((x,array(j))) = hashmap.getOrElse((x,array(j)),0) + 1
}
}
})
* hashmap.toIterator*
})
Also, if you want higher performance than the default HashMap, you can try
other Hashmaps specialized for primitive types which avoid boxing-unnboxing
of ints thus giving significant performance boost. Take a look at these
collections<http://fastutil.di.unimi.it/docs/index.html?it/unimi/dsi/fastutil/objects/Object2IntOpenHashMap.html>.
Though you have to figure someway to get an iterator out of them, as
mapPartitions needs to iterator.
On Tue, Jan 21, 2014 at 3:53 PM, Brad Ruderman <[email protected]>wrote:
> Thanks Nick. I really appreciate your help. I understand the iterator and
> have modified the code to return the iterator. What I am noticing is that
> it returns:
>
> import collection.mutable.HashMap
> import collection.mutable.ListBuffer
> def getArray(line: String):List[Int] = {
> var a = line.split("\\x01")(1).split("\\x02")
> var ids = new ListBuffer[Int]
> for (x <- 0 to a.length - 1){
> ids += Integer.parseInt(a(x).split("\\x03")(0))
> }
> return ids.toList
> }
>
> var textFile = sc.textFile("input.txt")
> val counts = textFile.mapPartitions(lines => {
> val hashmap = new HashMap[(Int,Int),Int]()
> lines.foreach( line => {
> val array = getArray(line)
> for((x,i) <- array.view.zipWithIndex){
> for (j <- (i+1) to array.length - 1){
> hashmap((x,array(j))) = hashmap.getOrElse((x,array(j)),0) + 1
> }
> }
> })
> Iterator.single(hashmap)
> })
>
> counts: org.apache.spark.rdd.RDD[scala.collection.mutable.HashMap[(Int,
> Int),Int]] = MapPartitionsRDD[18] at mapPartitions at <console>:26
>
> However on the original flatMap there is some type of implicit conversion
> that is happening since it is returning:
> counts: org.apache.spark.rdd.RDD[((Int, Int), Int)] = FlatMappedRDD[22] at
> flatMap at <console>:31
>
> Thus I cannot reduceByKey(_ + _) since I get this error:
> <console>:35: error: value reduceByKey is not a member of
> org.apache.spark.rdd.RDD[scala.collection.mutable.HashMap[(Int, Int),Int]]
> counts.reduceByKey(_ + _)
>
> I believe there is some type of implicit conversion happening from a
> HashMap to a Seq of (Int,Int),Int. However I can't seem to mimic
> (implicitly or explicity) this type of conversion. I am running this within
> the spark shell so importing SparkContext or java conversions shouldnt be
> necessary.
>
> Once again thanks for all your help. As you can tell I am very new to
> Scala and Spark.
>
> Thanks,
> Brad
>
>
> On Tue, Jan 21, 2014 at 12:14 PM, Nick Pentreath <[email protected]
> > wrote:
>
>> mapPartitions expects an iterable return value. If within each partition
>> you're aggregating the hash map, you can use your foreach operation (or
>> map) to aggregate values In the hashmap, and then put iterator.single (I
>> think, can't quite remember the syntax).
>>
>> Thus each mapPartitions will return an iterator containing a single
>> hashmap, that in turn contains the aggregate for the partition. Then
>> perform a reduceByKey operation on the (Int, Int) keys with hashmap values
>> for a combined count.
>>
>> You may want to take a look at Algebird (
>> https://github.com/twitter/algebird) which contains monoids for map
>> aggregation which may be useful.
>>
>> —
>> Sent from Mailbox <https://www.dropbox.com/mailbox> for iPhone
>>
>>
>> On Tue, Jan 21, 2014 at 10:05 PM, Brad Ruderman
>> <[email protected]>wrote:
>>
>>>
>>> Hi Tathagata-
>>> Thanks for your help. I appreciate your suggestions. I am having trouble
>>> following the code for the mapPartitions
>>> According to the documentation I need to pass an a function of type
>>> iter[t], thus I receive an error on the foreach.
>>>
>>> var textFile = sc.textFile("input.txt")
>>> textFile.mapPartitions(lines => {
>>> lines.foreach( line => {
>>> println(line)
>>> })
>>> })
>>> <console>:18: error: type mismatch;
>>> found : Unit
>>> required: Iterator[?]
>>> Error occurred in an application involving default arguments.
>>> lines.foreach( line => {
>>>
>>> Therefore I think I need to run a map, but then not sure if I should
>>> reduce within the mapPartitions lambda or outside.
>>>
>>> val counts = textFile.mapPartitions(lines => {
>>> val hashmap = new HashMap[(Int,Int),Int]()
>>> lines.map( line => {
>>> val array = getArray(line)
>>> for (x <- 0 to array.length - 1){
>>> for (y <- 0 to array.length - 1){
>>> hashmap((array(x), array(y))) = 1
>>> }
>>> }
>>> })
>>> })
>>>
>>> In this case the hashmap is not accessible outside the Partitions, since
>>> when I save to file it is empty. Could you please clarify how to use
>>> mapPartitions()?
>>>
>>> Thanks,
>>> Brad
>>>
>>> On Mon, Jan 20, 2014 at 6:41 PM, Tathagata Das <
>>> [email protected]> wrote:
>>>
>>>> Well, right now it is quite parallelized as each line is processed
>>>> independent of each other, with a single reduce doing the final counting.
>>>> You can optimize the combination generation by several ways.
>>>>
>>>> 1. I may be missing something, but do you need to sort the list to
>>>> aSorted? That maybe costly for large lists.
>>>>
>>>> 2. Furthermore, dropping from the sorted sequence is probably costly.
>>>> Just walking through the list with indices is likely to me much more
>>>> efficient.
>>>>
>>>> for (xi <- 0 until length) {
>>>> for (yi <- xi + 1 until length) {
>>>> ...
>>>> } }
>>>>
>>>> 3. Creating and destroying hashmap's for every array of number, that
>>>> is, for every line in the file, is not very efficient. Using mapPartition
>>>> is better, as you can create one hashmap for the entire partition of data,
>>>> and process the whole partition of the file at one go.
>>>>
>>>> textFile.mapPartition(lines => {
>>>> val hashmap = ...
>>>> lines.foreach(line => {
>>>> val array = getArray(line)
>>>> // add combinations from array
>>>> })
>>>> hashmap
>>>> })
>>>>
>>>> Hope this helps.
>>>>
>>>> TD
>>>>
>>>>
>>>> On Mon, Jan 20, 2014 at 4:53 PM, Brad Ruderman
>>>> <[email protected]>wrote:
>>>>
>>>>> Thanks Again Tathagata.
>>>>>
>>>>> I successfully launched an EC2 cluster with 5 nodes. I have split the
>>>>> tasks out and found a lot of time is being spent running the:
>>>>>
>>>>> def getCombinations(a: List[Int]) : HashMap[(Int,Int),Int] = {
>>>>> val comboMap = new HashMap[(Int,Int),Int]()
>>>>> var aSorted = a.sorted
>>>>> for (x <- 0 to aSorted.length - 1){
>>>>> var b = aSorted(0)
>>>>> aSorted = aSorted.drop(1)
>>>>> for (y <- 0 to aSorted.length - 1){
>>>>> comboMap((b, aSorted(y))) = 1
>>>>> }
>>>>> }
>>>>> return comboMap
>>>>> }
>>>>>
>>>>> from the:
>>>>> val counts = textFile.map(line => getArray(line)).flatMap(a =>
>>>>> getCombinations(a)).reduceByKey(_ + _)
>>>>>
>>>>> Essentially I am trying to perform a "semi-cartesian" product,
>>>>> element array element contained within a larger array is joined against
>>>>> all
>>>>> other elements within that array. So if I have [[1,2,3],[2,3,5]] that
>>>>> would
>>>>> become:
>>>>> 1,2
>>>>> 1,3
>>>>> 2,3
>>>>> 2,3
>>>>> 2,5
>>>>> 3,5
>>>>>
>>>>> which would reduce down to
>>>>> 1,2 1
>>>>> 1,3 1
>>>>> 2,3 2
>>>>> 2,5
>>>>> 3,5
>>>>>
>>>>> I am wondering if there is a faster method that could be better
>>>>> parallelized then what I am doing.
>>>>>
>>>>> Thanks,
>>>>> Brad
>>>>>
>>>>> On Sun, Jan 19, 2014 at 2:25 PM, Tathagata Das <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> 1) System properties must be set before creating the SparkContext.
>>>>>> When using the spark shell (which creates the context automatically), you
>>>>>> have to set it before starting the shell. See the
>>>>>> configuration<http://spark.incubator.apache.org/docs/latest/configuration.html#environment-variables>page
>>>>>> to see how to set java properties through environment variables.
>>>>>>
>>>>>> 2) Regarding correctness of computation, if it works locally it
>>>>>> should work on the cluster. Regarding parallelism, Spark runs only ask
>>>>>> many
>>>>>> task simultaneously as there are number of cores that the SparkContext
>>>>>> has
>>>>>> asked for. So on your local machine, it doesnt help much to increase the
>>>>>> number of tasks too much as it only has 4 cores (or 8 hyperthreaded one,
>>>>>> maybe). Having 10x more tasks with each task doing 1/10th the computation
>>>>>> ends up taking more or less the same amount of processing time. With more
>>>>>> cores in a cluster, it will parallelize further. The number of tasks
>>>>>> should
>>>>>> be configured to be about 2-3x the number of cores in the cluster. See
>>>>>> the
>>>>>> tuning<http://spark.incubator.apache.org/docs/latest/tuning.html#level-of-parallelism>page.
>>>>>>
>>>>>> TD
>>>>>>
>>>>>>
>>>>>> On Sun, Jan 19, 2014 at 1:30 PM, Brad Ruderman <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi Tathagata-
>>>>>>> Thanks for your response. I appreciate your help. A few follow-up
>>>>>>> questions:
>>>>>>> 1) I updated to master and tried to set the system properties.
>>>>>>> However when I view the properties in the web console, I don't see these
>>>>>>> listed. Also I see the that in activity monitor:
>>>>>>> org.apache.spark.repl.Main is only taking 758mb.
>>>>>>>
>>>>>>> scala> System.setProperty("spark.executor.memory","4gb")
>>>>>>> res0: String = null
>>>>>>>
>>>>>>> scala> System.setProperty("spark.executor.memory","4gb")
>>>>>>> res1: String = 4gb
>>>>>>>
>>>>>>> Did something change in master that would prevent this from being
>>>>>>> set higher in standalone mode. My exact repo is:
>>>>>>>
>>>>>>> git clone [email protected]:apache/incubator-spark.git
>>>>>>> ./sbt/sbt assembly
>>>>>>> cd conf
>>>>>>> cp spark-env.sh.template spark-env.sh
>>>>>>> cp log4j.properties.template log4j.properties
>>>>>>> ./bin/spark-shell
>>>>>>>
>>>>>>> 2) My next steps are going to be to test this job in an AMZ EC2
>>>>>>> cluster, but is there anything I can do locally to ensure that it is
>>>>>>> properly built so that it can be heavily parallelized. I tried
>>>>>>> increasing
>>>>>>> the level of parallelism as well via passing a parameter in the
>>>>>>> reduceByKey
>>>>>>> value, but I didn't see any differences in performance. My local
>>>>>>> computer
>>>>>>> also doesn't seem to suffer as a result (which I think it would. I am
>>>>>>> running a MB Pro Retina i7 + 16gb Ram). I am worried that the using
>>>>>>> ListBuffer or HashMap will significantly slow things down and there are
>>>>>>> better ways to do this.
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Brad
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Jan 18, 2014 at 7:11 PM, Tathagata Das <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hello Brad,
>>>>>>>>
>>>>>>>> The shuffle operation seems to be taking too much memory, more than
>>>>>>>> what your Java program can provide. I am not sure whether you have
>>>>>>>> already
>>>>>>>> tried or not, but there are few basic things you can try.
>>>>>>>> 1. If you are running a local standalone Spark cluster, you can set
>>>>>>>> the amount of memory that the worker can use by setting
>>>>>>>> spark.executor.memory.
>>>>>>>> See Spark
>>>>>>>> configuration<http://spark.incubator.apache.org/docs/latest/configuration.html>
>>>>>>>> .
>>>>>>>> 2. You can increase the number of partitions created from the text
>>>>>>>> file, and increase the number of reducers. This lowers the memory
>>>>>>>> usage of
>>>>>>>> each task. See Spark tuning
>>>>>>>> guide<http://spark.incubator.apache.org/docs/latest/tuning.html#other-considerations>
>>>>>>>> .
>>>>>>>> 3. If you are still running out of memory, you can try our latest Spark
>>>>>>>> 0.9 <https://github.com/apache/incubator-spark/tree/branch-0.9>(or
>>>>>>>> the master branch). Shuffles have been modified to automatically spill
>>>>>>>> to
>>>>>>>> disk when it needs more than available memory. 0.9 is undergoing
>>>>>>>> community
>>>>>>>> voting and will be released very soon.
>>>>>>>>
>>>>>>>> TD
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Jan 18, 2014 at 4:58 PM, Brad Ruderman <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi All-
>>>>>>>>> I am very new to Spark (and Scala) but I was hoping to get some
>>>>>>>>> help creating a distributed job. Essentially I have a set of files
>>>>>>>>> (8k at
>>>>>>>>> 300mb each in HDFS) which was created in Hive (Hadoop) with the form:
>>>>>>>>> Int
>>>>>>>>> Array<Map<int,string>>
>>>>>>>>>
>>>>>>>>> I want to create a spark job that reads these files and creates a
>>>>>>>>> combination of the int array. Meaning it iterates over every int,
>>>>>>>>> ignoring
>>>>>>>>> the string, in the Array<Map<int,string>> and produces a [k,v] of
>>>>>>>>> (int_1,int_2), 1 Then it reduces by the k and sums the v to produce a
>>>>>>>>> final
>>>>>>>>> result over a large dataset of :
>>>>>>>>> (int_1, int_2), count
>>>>>>>>>
>>>>>>>>> It is producing a set of all possible combinations for that given
>>>>>>>>> record. In order to do this I have locally downloaded a single file
>>>>>>>>> and am
>>>>>>>>> running Spark locally on my computer to try to process a a single
>>>>>>>>> file. The
>>>>>>>>> file is 382715 lines (~400mb), however some of the arrays can be
>>>>>>>>> quite big.
>>>>>>>>>
>>>>>>>>> My Spark Job looks like the following:
>>>>>>>>>
>>>>>>>>> import collection.mutable.HashMap
>>>>>>>>> import collection.mutable.ListBuffer
>>>>>>>>>
>>>>>>>>> def getCombinations(a: List[Int]) : HashMap[(Int,Int),Int] = {
>>>>>>>>> val comboMap = new HashMap[(Int,Int),Int]()
>>>>>>>>> var aSorted = a.sorted
>>>>>>>>> for (x <- 0 to aSorted.length - 1){
>>>>>>>>> var b = aSorted(0)
>>>>>>>>> aSorted = aSorted.drop(1)
>>>>>>>>> for (y <- 0 to aSorted.length - 1){
>>>>>>>>> comboMap((b, aSorted(y))) = 1
>>>>>>>>> }
>>>>>>>>> }
>>>>>>>>> return comboMap
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> def getArray(line: String):List[Int] = {
>>>>>>>>> var a = line.split("\\x01")(1).split("\\x02")
>>>>>>>>> var ids = new ListBuffer[Int]
>>>>>>>>> for (x <- 0 to a.length - 1){
>>>>>>>>> ids += Integer.parseInt(a(x).split("\\x03")(0))
>>>>>>>>> }
>>>>>>>>> return ids.toList
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> val textFile = sc.textFile("slices.txt")
>>>>>>>>> val counts = textFile.map(line => getArray(line)).flatMap(a =>
>>>>>>>>> getCombinations(a)).reduceByKey(_ + _)
>>>>>>>>> counts.saveAsTextFile("spark_test")
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I am using getArray() to process the text file from its hive
>>>>>>>>> storage form, and then getCombinations() to create the k,v list then
>>>>>>>>> finally reducing over this list. Eventually I will move this to a
>>>>>>>>> distributed cluster but I was hoping to determine if there is anyway
>>>>>>>>> to
>>>>>>>>> parallelize or optimize this job for my local computer. Currently it
>>>>>>>>> fails
>>>>>>>>> with:
>>>>>>>>>
>>>>>>>>> java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: Java
>>>>>>>>> heap space)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.util.AppendOnlyMap.growTable(AppendOnlyMap.scala:200)
>>>>>>>>> org.apache.spark.util.AppendOnlyMap.incrementSize(AppendOnlyMap.scala:180)
>>>>>>>>> org.apache.spark.util.AppendOnlyMap.changeValue(AppendOnlyMap.scala:130)
>>>>>>>>> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:42)
>>>>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
>>>>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
>>>>>>>>> org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
>>>>>>>>> org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
>>>>>>>>> Thanks,Brad
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>