Thanks Nick. I really appreciate your help. I understand the iterator and
have modified the code to return the iterator. What I am noticing is that
it returns:
import collection.mutable.HashMap
import collection.mutable.ListBuffer
def getArray(line: String):List[Int] = {
var a = line.split("\\x01")(1).split("\\x02")
var ids = new ListBuffer[Int]
for (x <- 0 to a.length - 1){
ids += Integer.parseInt(a(x).split("\\x03")(0))
}
return ids.toList
}
var textFile = sc.textFile("input.txt")
val counts = textFile.mapPartitions(lines => {
val hashmap = new HashMap[(Int,Int),Int]()
lines.foreach( line => {
val array = getArray(line)
for((x,i) <- array.view.zipWithIndex){
for (j <- (i+1) to array.length - 1){
hashmap((x,array(j))) = hashmap.getOrElse((x,array(j)),0) + 1
}
}
})
Iterator.single(hashmap)
})
counts: org.apache.spark.rdd.RDD[scala.collection.mutable.HashMap[(Int,
Int),Int]] = MapPartitionsRDD[18] at mapPartitions at <console>:26
However on the original flatMap there is some type of implicit conversion
that is happening since it is returning:
counts: org.apache.spark.rdd.RDD[((Int, Int), Int)] = FlatMappedRDD[22] at
flatMap at <console>:31
Thus I cannot reduceByKey(_ + _) since I get this error:
<console>:35: error: value reduceByKey is not a member of
org.apache.spark.rdd.RDD[scala.collection.mutable.HashMap[(Int, Int),Int]]
counts.reduceByKey(_ + _)
I believe there is some type of implicit conversion happening from a
HashMap to a Seq of (Int,Int),Int. However I can't seem to mimic
(implicitly or explicity) this type of conversion. I am running this within
the spark shell so importing SparkContext or java conversions shouldnt be
necessary.
Once again thanks for all your help. As you can tell I am very new to Scala
and Spark.
Thanks,
Brad
On Tue, Jan 21, 2014 at 12:14 PM, Nick Pentreath
<[email protected]>wrote:
> mapPartitions expects an iterable return value. If within each partition
> you're aggregating the hash map, you can use your foreach operation (or
> map) to aggregate values In the hashmap, and then put iterator.single (I
> think, can't quite remember the syntax).
>
> Thus each mapPartitions will return an iterator containing a single
> hashmap, that in turn contains the aggregate for the partition. Then
> perform a reduceByKey operation on the (Int, Int) keys with hashmap values
> for a combined count.
>
> You may want to take a look at Algebird (
> https://github.com/twitter/algebird) which contains monoids for map
> aggregation which may be useful.
>
> —
> Sent from Mailbox <https://www.dropbox.com/mailbox> for iPhone
>
>
> On Tue, Jan 21, 2014 at 10:05 PM, Brad Ruderman <[email protected]>wrote:
>
>>
>> Hi Tathagata-
>> Thanks for your help. I appreciate your suggestions. I am having trouble
>> following the code for the mapPartitions
>> According to the documentation I need to pass an a function of type
>> iter[t], thus I receive an error on the foreach.
>>
>> var textFile = sc.textFile("input.txt")
>> textFile.mapPartitions(lines => {
>> lines.foreach( line => {
>> println(line)
>> })
>> })
>> <console>:18: error: type mismatch;
>> found : Unit
>> required: Iterator[?]
>> Error occurred in an application involving default arguments.
>> lines.foreach( line => {
>>
>> Therefore I think I need to run a map, but then not sure if I should
>> reduce within the mapPartitions lambda or outside.
>>
>> val counts = textFile.mapPartitions(lines => {
>> val hashmap = new HashMap[(Int,Int),Int]()
>> lines.map( line => {
>> val array = getArray(line)
>> for (x <- 0 to array.length - 1){
>> for (y <- 0 to array.length - 1){
>> hashmap((array(x), array(y))) = 1
>> }
>> }
>> })
>> })
>>
>> In this case the hashmap is not accessible outside the Partitions, since
>> when I save to file it is empty. Could you please clarify how to use
>> mapPartitions()?
>>
>> Thanks,
>> Brad
>>
>> On Mon, Jan 20, 2014 at 6:41 PM, Tathagata Das <
>> [email protected]> wrote:
>>
>>> Well, right now it is quite parallelized as each line is processed
>>> independent of each other, with a single reduce doing the final counting.
>>> You can optimize the combination generation by several ways.
>>>
>>> 1. I may be missing something, but do you need to sort the list to
>>> aSorted? That maybe costly for large lists.
>>>
>>> 2. Furthermore, dropping from the sorted sequence is probably costly.
>>> Just walking through the list with indices is likely to me much more
>>> efficient.
>>>
>>> for (xi <- 0 until length) {
>>> for (yi <- xi + 1 until length) {
>>> ...
>>> } }
>>>
>>> 3. Creating and destroying hashmap's for every array of number, that is,
>>> for every line in the file, is not very efficient. Using mapPartition is
>>> better, as you can create one hashmap for the entire partition of data, and
>>> process the whole partition of the file at one go.
>>>
>>> textFile.mapPartition(lines => {
>>> val hashmap = ...
>>> lines.foreach(line => {
>>> val array = getArray(line)
>>> // add combinations from array
>>> })
>>> hashmap
>>> })
>>>
>>> Hope this helps.
>>>
>>> TD
>>>
>>>
>>> On Mon, Jan 20, 2014 at 4:53 PM, Brad Ruderman
>>> <[email protected]>wrote:
>>>
>>>> Thanks Again Tathagata.
>>>>
>>>> I successfully launched an EC2 cluster with 5 nodes. I have split the
>>>> tasks out and found a lot of time is being spent running the:
>>>>
>>>> def getCombinations(a: List[Int]) : HashMap[(Int,Int),Int] = {
>>>> val comboMap = new HashMap[(Int,Int),Int]()
>>>> var aSorted = a.sorted
>>>> for (x <- 0 to aSorted.length - 1){
>>>> var b = aSorted(0)
>>>> aSorted = aSorted.drop(1)
>>>> for (y <- 0 to aSorted.length - 1){
>>>> comboMap((b, aSorted(y))) = 1
>>>> }
>>>> }
>>>> return comboMap
>>>> }
>>>>
>>>> from the:
>>>> val counts = textFile.map(line => getArray(line)).flatMap(a =>
>>>> getCombinations(a)).reduceByKey(_ + _)
>>>>
>>>> Essentially I am trying to perform a "semi-cartesian" product,
>>>> element array element contained within a larger array is joined against all
>>>> other elements within that array. So if I have [[1,2,3],[2,3,5]] that would
>>>> become:
>>>> 1,2
>>>> 1,3
>>>> 2,3
>>>> 2,3
>>>> 2,5
>>>> 3,5
>>>>
>>>> which would reduce down to
>>>> 1,2 1
>>>> 1,3 1
>>>> 2,3 2
>>>> 2,5
>>>> 3,5
>>>>
>>>> I am wondering if there is a faster method that could be better
>>>> parallelized then what I am doing.
>>>>
>>>> Thanks,
>>>> Brad
>>>>
>>>> On Sun, Jan 19, 2014 at 2:25 PM, Tathagata Das <
>>>> [email protected]> wrote:
>>>>
>>>>> 1) System properties must be set before creating the SparkContext.
>>>>> When using the spark shell (which creates the context automatically), you
>>>>> have to set it before starting the shell. See the
>>>>> configuration<http://spark.incubator.apache.org/docs/latest/configuration.html#environment-variables>page
>>>>> to see how to set java properties through environment variables.
>>>>>
>>>>> 2) Regarding correctness of computation, if it works locally it should
>>>>> work on the cluster. Regarding parallelism, Spark runs only ask many task
>>>>> simultaneously as there are number of cores that the SparkContext has
>>>>> asked
>>>>> for. So on your local machine, it doesnt help much to increase the number
>>>>> of tasks too much as it only has 4 cores (or 8 hyperthreaded one, maybe).
>>>>> Having 10x more tasks with each task doing 1/10th the computation ends up
>>>>> taking more or less the same amount of processing time. With more cores in
>>>>> a cluster, it will parallelize further. The number of tasks should be
>>>>> configured to be about 2-3x the number of cores in the cluster. See the
>>>>> tuning<http://spark.incubator.apache.org/docs/latest/tuning.html#level-of-parallelism>page.
>>>>>
>>>>> TD
>>>>>
>>>>>
>>>>> On Sun, Jan 19, 2014 at 1:30 PM, Brad Ruderman <[email protected]
>>>>> > wrote:
>>>>>
>>>>>> Hi Tathagata-
>>>>>> Thanks for your response. I appreciate your help. A few follow-up
>>>>>> questions:
>>>>>> 1) I updated to master and tried to set the system properties.
>>>>>> However when I view the properties in the web console, I don't see these
>>>>>> listed. Also I see the that in activity monitor:
>>>>>> org.apache.spark.repl.Main is only taking 758mb.
>>>>>>
>>>>>> scala> System.setProperty("spark.executor.memory","4gb")
>>>>>> res0: String = null
>>>>>>
>>>>>> scala> System.setProperty("spark.executor.memory","4gb")
>>>>>> res1: String = 4gb
>>>>>>
>>>>>> Did something change in master that would prevent this from being set
>>>>>> higher in standalone mode. My exact repo is:
>>>>>>
>>>>>> git clone [email protected]:apache/incubator-spark.git
>>>>>> ./sbt/sbt assembly
>>>>>> cd conf
>>>>>> cp spark-env.sh.template spark-env.sh
>>>>>> cp log4j.properties.template log4j.properties
>>>>>> ./bin/spark-shell
>>>>>>
>>>>>> 2) My next steps are going to be to test this job in an AMZ EC2
>>>>>> cluster, but is there anything I can do locally to ensure that it is
>>>>>> properly built so that it can be heavily parallelized. I tried
>>>>>> increasing
>>>>>> the level of parallelism as well via passing a parameter in the
>>>>>> reduceByKey
>>>>>> value, but I didn't see any differences in performance. My local computer
>>>>>> also doesn't seem to suffer as a result (which I think it would. I am
>>>>>> running a MB Pro Retina i7 + 16gb Ram). I am worried that the using
>>>>>> ListBuffer or HashMap will significantly slow things down and there are
>>>>>> better ways to do this.
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Brad
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, Jan 18, 2014 at 7:11 PM, Tathagata Das <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hello Brad,
>>>>>>>
>>>>>>> The shuffle operation seems to be taking too much memory, more than
>>>>>>> what your Java program can provide. I am not sure whether you have
>>>>>>> already
>>>>>>> tried or not, but there are few basic things you can try.
>>>>>>> 1. If you are running a local standalone Spark cluster, you can set
>>>>>>> the amount of memory that the worker can use by setting
>>>>>>> spark.executor.memory.
>>>>>>> See Spark
>>>>>>> configuration<http://spark.incubator.apache.org/docs/latest/configuration.html>
>>>>>>> .
>>>>>>> 2. You can increase the number of partitions created from the text
>>>>>>> file, and increase the number of reducers. This lowers the memory usage
>>>>>>> of
>>>>>>> each task. See Spark tuning
>>>>>>> guide<http://spark.incubator.apache.org/docs/latest/tuning.html#other-considerations>
>>>>>>> .
>>>>>>> 3. If you are still running out of memory, you can try our latest Spark
>>>>>>> 0.9 <https://github.com/apache/incubator-spark/tree/branch-0.9>(or
>>>>>>> the master branch). Shuffles have been modified to automatically spill
>>>>>>> to
>>>>>>> disk when it needs more than available memory. 0.9 is undergoing
>>>>>>> community
>>>>>>> voting and will be released very soon.
>>>>>>>
>>>>>>> TD
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Jan 18, 2014 at 4:58 PM, Brad Ruderman <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi All-
>>>>>>>> I am very new to Spark (and Scala) but I was hoping to get some
>>>>>>>> help creating a distributed job. Essentially I have a set of files (8k
>>>>>>>> at
>>>>>>>> 300mb each in HDFS) which was created in Hive (Hadoop) with the form:
>>>>>>>> Int
>>>>>>>> Array<Map<int,string>>
>>>>>>>>
>>>>>>>> I want to create a spark job that reads these files and creates a
>>>>>>>> combination of the int array. Meaning it iterates over every int,
>>>>>>>> ignoring
>>>>>>>> the string, in the Array<Map<int,string>> and produces a [k,v] of
>>>>>>>> (int_1,int_2), 1 Then it reduces by the k and sums the v to produce a
>>>>>>>> final
>>>>>>>> result over a large dataset of :
>>>>>>>> (int_1, int_2), count
>>>>>>>>
>>>>>>>> It is producing a set of all possible combinations for that given
>>>>>>>> record. In order to do this I have locally downloaded a single file
>>>>>>>> and am
>>>>>>>> running Spark locally on my computer to try to process a a single
>>>>>>>> file. The
>>>>>>>> file is 382715 lines (~400mb), however some of the arrays can be quite
>>>>>>>> big.
>>>>>>>>
>>>>>>>> My Spark Job looks like the following:
>>>>>>>>
>>>>>>>> import collection.mutable.HashMap
>>>>>>>> import collection.mutable.ListBuffer
>>>>>>>>
>>>>>>>> def getCombinations(a: List[Int]) : HashMap[(Int,Int),Int] = {
>>>>>>>> val comboMap = new HashMap[(Int,Int),Int]()
>>>>>>>> var aSorted = a.sorted
>>>>>>>> for (x <- 0 to aSorted.length - 1){
>>>>>>>> var b = aSorted(0)
>>>>>>>> aSorted = aSorted.drop(1)
>>>>>>>> for (y <- 0 to aSorted.length - 1){
>>>>>>>> comboMap((b, aSorted(y))) = 1
>>>>>>>> }
>>>>>>>> }
>>>>>>>> return comboMap
>>>>>>>> }
>>>>>>>>
>>>>>>>> def getArray(line: String):List[Int] = {
>>>>>>>> var a = line.split("\\x01")(1).split("\\x02")
>>>>>>>> var ids = new ListBuffer[Int]
>>>>>>>> for (x <- 0 to a.length - 1){
>>>>>>>> ids += Integer.parseInt(a(x).split("\\x03")(0))
>>>>>>>> }
>>>>>>>> return ids.toList
>>>>>>>> }
>>>>>>>>
>>>>>>>> val textFile = sc.textFile("slices.txt")
>>>>>>>> val counts = textFile.map(line => getArray(line)).flatMap(a =>
>>>>>>>> getCombinations(a)).reduceByKey(_ + _)
>>>>>>>> counts.saveAsTextFile("spark_test")
>>>>>>>>
>>>>>>>>
>>>>>>>> I am using getArray() to process the text file from its hive
>>>>>>>> storage form, and then getCombinations() to create the k,v list then
>>>>>>>> finally reducing over this list. Eventually I will move this to a
>>>>>>>> distributed cluster but I was hoping to determine if there is anyway to
>>>>>>>> parallelize or optimize this job for my local computer. Currently it
>>>>>>>> fails
>>>>>>>> with:
>>>>>>>>
>>>>>>>> java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: Java heap
>>>>>>>> space)
>>>>>>>>
>>>>>>>>
>>>>>>>> org.apache.spark.util.AppendOnlyMap.growTable(AppendOnlyMap.scala:200)
>>>>>>>> org.apache.spark.util.AppendOnlyMap.incrementSize(AppendOnlyMap.scala:180)
>>>>>>>> org.apache.spark.util.AppendOnlyMap.changeValue(AppendOnlyMap.scala:130)
>>>>>>>> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:42)
>>>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
>>>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
>>>>>>>> org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
>>>>>>>> org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
>>>>>>>> Thanks,Brad
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>