Patrick, you got me thinking, but I'm sticking to my opinion that
reduceByKey should be avoided if possible.  I tried some timings:

def time[T](code : => T) =  {
        val t0 = System.nanoTime : Double
        val res = code
        val t1 = System.nanoTime : Double
        println("Elapsed time " + (t1 - t0) / 1000000.0 + " msecs")
        res
}
val sparsity=.001
val rows = sc.parallelize(1 to 10000000).mapPartitionsWithIndex( (id, it)
=> {val rng = new scala.util.Random(id+42); it.map(row => (0 until
10000).filter(i => rng.nextDouble>1-sparsity).map(i => (i,1)) )}
).map(_.toArray).cache
val rowsFlat = rows.flatMap(rr => rr).cache

rows.count
rowsFlat.count

val cSums1 = time(rowsFlat.reduceByKey(_+_).collect.sortBy(_._1))
//Elapsed time 725.394917 msecs

val cSums2 = time( rows.mapPartitions(it =>
Array(it.foldLeft(Array.fill(10000)(0))((acc,nn) =>
{nn.foreach(tt=>acc(tt._1)+=1); acc})).iterator).reduce( (r1,r2) =>
r1.zip(r2).map(tt => tt._1 + tt._2)))
//Elapsed time 206.962364 msecs

These are the best times over a small number of runs, but average case
showed the same behavior.
The merge reduction I had suggested was not even close, which doesn't
surprise me much on second thought.

At sparsity=.01, the times are 2447 v. 394.

Lesson 1: You would care about this in an iterative algorithm, but not in a
one-off application.
Lesson 2: Shuffle is slow in comparison, even for a small number of
elements.
Lesson 3: Spark would be even cooler with highly optimized reduce and
broadcast.



On Fri, Nov 8, 2013 at 7:01 PM, Philip Ogren <philip.og...@oracle.com>wrote:

> Thank you for the pointers.  I'm not sure I was able to fully understand
> either of your suggestions but here is what I came up with.  I started with
> Tom's code but I think I ended up borrowing from Patrick's suggestion too.
>  Any thoughts about my updated solution are more than welcome!  I added
> local variable types for clarify.
>
>   def countPopulatedColumns(tsv: RDD[String]) : RDD[(Int, Int)] = {
>     //split by tab and zip with index to give column value, column index
> pairs
>     val sparse : RDD[(String, Int)] = tsv.flatMap(line =>
> line.split("\t").zipWithIndex)
>     //filter out all the zero length values
>     val dense : RDD[(String, Int)] = sparse.filter(valueIndex =>
> valueIndex._1.length>0)
>     //map each column index to one and do the usual reduction
>     dense.map(valueIndex => (valueIndex._2, 1)).reduceByKey(_+_)
>   }
>
> Of course, this can be condensed to a single line but it doesn't seem as
> easy to read as the more verbose code above.  Write-once code like the
> following is why I never liked Perl....
>
>   def cpc(tsv: RDD[String]) : RDD[(Int, Int)] = {
>     tsv.flatMap(_.split("\t").zipWithIndex).filter(ci =>
> ci._1.length>0).map(ci => (ci._2, 1)).reduceByKey(_+_)
>   }
>
> Thanks,
> Philip
>
>
>
> On 11/8/2013 2:41 PM, Patrick Wendell wrote:
>
>> Hey Tom,
>>
>> reduceByKey will reduce locally on all the nodes, so there won't be
>> any data movement except to combine totals at the end.
>>
>> - Patrick
>>
>> On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek <minnesota...@gmail.com> wrote:
>>
>>> Your example requires each row to be exactly the same length, since
>>> zipped
>>> will truncate to the shorter of its two arguments.
>>>
>>> The second solution is elegant, but reduceByKey involves flying a bunch
>>> of
>>> data around to sort the keys.  I suspect it would be a lot slower.  But
>>> you
>>> could save yourself from adding up a bunch of zeros:
>>>
>>>   val sparseRows = spark.textFile("myfile.tsv").map(line =>
>>> line.split("\t").zipWithIndex.filter(_._1.length>0))
>>> sparseRows.reduce(mergeAdd(_,_))
>>>
>>> You'll have to write a mergeAdd function.  This might not be any faster,
>>> but
>>> it does allow variable length rows.
>>>
>>>
>>> On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell <pwend...@gmail.com>
>>> wrote:
>>>
>>>> It would be a bit more straightforward to write it like this:
>>>>
>>>> val columns = [same as before]
>>>>
>>>> val counts = columns.flatMap(emit (col_id, 0 or 1) for each
>>>> column).reduceByKey(_+ _)
>>>>
>>>> Basically look at each row and emit several records using flatMap.
>>>> Each record has an ID for the column (maybe its index) and a flag for
>>>> whether it's present.
>>>>
>>>> Then you reduce by key to get the per-column count. Then you can
>>>> collect at the end.
>>>>
>>>> - Patrick
>>>>
>>>> On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren <philip.og...@oracle.com>
>>>> wrote:
>>>>
>>>>> Hi Spark coders,
>>>>>
>>>>> I wrote my first little Spark job that takes columnar data and counts
>>>>> up
>>>>> how
>>>>> many times each column is populated in an RDD.  Here is the code I came
>>>>> up
>>>>> with:
>>>>>
>>>>>      //RDD of List[String] corresponding to tab delimited values
>>>>>      val columns = spark.textFile("myfile.tsv").map(line =>
>>>>> line.split("\t").toList)
>>>>>      //RDD of List[Int] corresponding to populated columns (1 for
>>>>> populated
>>>>> and 0 for not populated)
>>>>>      val populatedColumns = columns.map(row => row.map(column =>
>>>>> if(column.length > 0) 1 else 0))
>>>>>      //List[Int] contains sums of the 1's in each column
>>>>>      val counts = populatedColumns.reduce((row1,row2)
>>>>> =>(row1,row2).zipped.map(_+_))
>>>>>
>>>>> Any thoughts about the fitness of this code snippet?  I'm a little
>>>>> annoyed
>>>>> by creating an RDD full of 1's and 0's in the second line.  The if
>>>>> statement
>>>>> feels awkward too.  I was happy to find the zipped method for the
>>>>> reduce
>>>>> step.  Any feedback you might have on how to improve this code is
>>>>> appreciated.  I'm a newbie to both Scala and Spark.
>>>>>
>>>>> Thanks,
>>>>> Philip
>>>>>
>>>>>
>>>
>

Reply via email to