Hey Philip,

Your code is exactly what I was suggesting. I didn't explain it
clearly, when I said "emit XX", I just meant "figure out how to do XX
and return the result" there isn't actually a function called 'emit'.

In your case, the correct way to do it was using zipWithIndex... I
just couldn't remember off the top of my head how to do that.

- Patrick

On Sat, Nov 9, 2013 at 4:41 AM, Tom Vacek <[email protected]> wrote:
> Patrick, you got me thinking, but I'm sticking to my opinion that
> reduceByKey should be avoided if possible.  I tried some timings:
>
> def time[T](code : => T) =  {
>         val t0 = System.nanoTime : Double
>         val res = code
>         val t1 = System.nanoTime : Double
>         println("Elapsed time " + (t1 - t0) / 1000000.0 + " msecs")
>         res
> }
> val sparsity=.001
> val rows = sc.parallelize(1 to 10000000).mapPartitionsWithIndex( (id, it) =>
> {val rng = new scala.util.Random(id+42); it.map(row => (0 until
> 10000).filter(i => rng.nextDouble>1-sparsity).map(i => (i,1)) )}
> ).map(_.toArray).cache
> val rowsFlat = rows.flatMap(rr => rr).cache
>
> rows.count
> rowsFlat.count
>
> val cSums1 = time(rowsFlat.reduceByKey(_+_).collect.sortBy(_._1))
> //Elapsed time 725.394917 msecs
>
> val cSums2 = time( rows.mapPartitions(it =>
> Array(it.foldLeft(Array.fill(10000)(0))((acc,nn) =>
> {nn.foreach(tt=>acc(tt._1)+=1); acc})).iterator).reduce( (r1,r2) =>
> r1.zip(r2).map(tt => tt._1 + tt._2)))
> //Elapsed time 206.962364 msecs
>
> These are the best times over a small number of runs, but average case
> showed the same behavior.
> The merge reduction I had suggested was not even close, which doesn't
> surprise me much on second thought.
>
> At sparsity=.01, the times are 2447 v. 394.
>
> Lesson 1: You would care about this in an iterative algorithm, but not in a
> one-off application.
> Lesson 2: Shuffle is slow in comparison, even for a small number of
> elements.
> Lesson 3: Spark would be even cooler with highly optimized reduce and
> broadcast.
>
>
>
> On Fri, Nov 8, 2013 at 7:01 PM, Philip Ogren <[email protected]>
> wrote:
>>
>> Thank you for the pointers.  I'm not sure I was able to fully understand
>> either of your suggestions but here is what I came up with.  I started with
>> Tom's code but I think I ended up borrowing from Patrick's suggestion too.
>> Any thoughts about my updated solution are more than welcome!  I added local
>> variable types for clarify.
>>
>>   def countPopulatedColumns(tsv: RDD[String]) : RDD[(Int, Int)] = {
>>     //split by tab and zip with index to give column value, column index
>> pairs
>>     val sparse : RDD[(String, Int)] = tsv.flatMap(line =>
>> line.split("\t").zipWithIndex)
>>     //filter out all the zero length values
>>     val dense : RDD[(String, Int)] = sparse.filter(valueIndex =>
>> valueIndex._1.length>0)
>>     //map each column index to one and do the usual reduction
>>     dense.map(valueIndex => (valueIndex._2, 1)).reduceByKey(_+_)
>>   }
>>
>> Of course, this can be condensed to a single line but it doesn't seem as
>> easy to read as the more verbose code above.  Write-once code like the
>> following is why I never liked Perl....
>>
>>   def cpc(tsv: RDD[String]) : RDD[(Int, Int)] = {
>>     tsv.flatMap(_.split("\t").zipWithIndex).filter(ci =>
>> ci._1.length>0).map(ci => (ci._2, 1)).reduceByKey(_+_)
>>   }
>>
>> Thanks,
>> Philip
>>
>>
>>
>> On 11/8/2013 2:41 PM, Patrick Wendell wrote:
>>>
>>> Hey Tom,
>>>
>>> reduceByKey will reduce locally on all the nodes, so there won't be
>>> any data movement except to combine totals at the end.
>>>
>>> - Patrick
>>>
>>> On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek <[email protected]> wrote:
>>>>
>>>> Your example requires each row to be exactly the same length, since
>>>> zipped
>>>> will truncate to the shorter of its two arguments.
>>>>
>>>> The second solution is elegant, but reduceByKey involves flying a bunch
>>>> of
>>>> data around to sort the keys.  I suspect it would be a lot slower.  But
>>>> you
>>>> could save yourself from adding up a bunch of zeros:
>>>>
>>>>   val sparseRows = spark.textFile("myfile.tsv").map(line =>
>>>> line.split("\t").zipWithIndex.filter(_._1.length>0))
>>>> sparseRows.reduce(mergeAdd(_,_))
>>>>
>>>> You'll have to write a mergeAdd function.  This might not be any faster,
>>>> but
>>>> it does allow variable length rows.
>>>>
>>>>
>>>> On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell <[email protected]>
>>>> wrote:
>>>>>
>>>>> It would be a bit more straightforward to write it like this:
>>>>>
>>>>> val columns = [same as before]
>>>>>
>>>>> val counts = columns.flatMap(emit (col_id, 0 or 1) for each
>>>>> column).reduceByKey(_+ _)
>>>>>
>>>>> Basically look at each row and emit several records using flatMap.
>>>>> Each record has an ID for the column (maybe its index) and a flag for
>>>>> whether it's present.
>>>>>
>>>>> Then you reduce by key to get the per-column count. Then you can
>>>>> collect at the end.
>>>>>
>>>>> - Patrick
>>>>>
>>>>> On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren <[email protected]>
>>>>> wrote:
>>>>>>
>>>>>> Hi Spark coders,
>>>>>>
>>>>>> I wrote my first little Spark job that takes columnar data and counts
>>>>>> up
>>>>>> how
>>>>>> many times each column is populated in an RDD.  Here is the code I
>>>>>> came
>>>>>> up
>>>>>> with:
>>>>>>
>>>>>>      //RDD of List[String] corresponding to tab delimited values
>>>>>>      val columns = spark.textFile("myfile.tsv").map(line =>
>>>>>> line.split("\t").toList)
>>>>>>      //RDD of List[Int] corresponding to populated columns (1 for
>>>>>> populated
>>>>>> and 0 for not populated)
>>>>>>      val populatedColumns = columns.map(row => row.map(column =>
>>>>>> if(column.length > 0) 1 else 0))
>>>>>>      //List[Int] contains sums of the 1's in each column
>>>>>>      val counts = populatedColumns.reduce((row1,row2)
>>>>>> =>(row1,row2).zipped.map(_+_))
>>>>>>
>>>>>> Any thoughts about the fitness of this code snippet?  I'm a little
>>>>>> annoyed
>>>>>> by creating an RDD full of 1's and 0's in the second line.  The if
>>>>>> statement
>>>>>> feels awkward too.  I was happy to find the zipped method for the
>>>>>> reduce
>>>>>> step.  Any feedback you might have on how to improve this code is
>>>>>> appreciated.  I'm a newbie to both Scala and Spark.
>>>>>>
>>>>>> Thanks,
>>>>>> Philip
>>>>>>
>>>>
>>
>

Reply via email to