Patrick, you got me thinking, but I'm sticking to my opinion that
reduceByKey should be avoided if possible. I tried some timings:
def time[T](code : => T) = {
val t0 = System.nanoTime : Double
val res = code
val t1 = System.nanoTime : Double
println("Elapsed time " + (t1 - t0) / 1000000.0 + " msecs")
res
}
val sparsity=.001
val rows = sc.parallelize(1 to 10000000).mapPartitionsWithIndex( (id, it)
=> {val rng = new scala.util.Random(id+42); it.map(row => (0 until
10000).filter(i => rng.nextDouble>1-sparsity).map(i => (i,1)) )}
).map(_.toArray).cache
val rowsFlat = rows.flatMap(rr => rr).cache
rows.count
rowsFlat.count
val cSums1 = time(rowsFlat.reduceByKey(_+_).collect.sortBy(_._1))
//Elapsed time 725.394917 msecs
val cSums2 = time( rows.mapPartitions(it =>
Array(it.foldLeft(Array.fill(10000)(0))((acc,nn) =>
{nn.foreach(tt=>acc(tt._1)+=1); acc})).iterator).reduce( (r1,r2) =>
r1.zip(r2).map(tt => tt._1 + tt._2)))
//Elapsed time 206.962364 msecs
These are the best times over a small number of runs, but average case
showed the same behavior.
The merge reduction I had suggested was not even close, which doesn't
surprise me much on second thought.
At sparsity=.01, the times are 2447 v. 394.
Lesson 1: You would care about this in an iterative algorithm, but not in a
one-off application.
Lesson 2: Shuffle is slow in comparison, even for a small number of
elements.
Lesson 3: Spark would be even cooler with highly optimized reduce and
broadcast.
On Fri, Nov 8, 2013 at 7:01 PM, Philip Ogren <[email protected]>wrote:
> Thank you for the pointers. I'm not sure I was able to fully understand
> either of your suggestions but here is what I came up with. I started with
> Tom's code but I think I ended up borrowing from Patrick's suggestion too.
> Any thoughts about my updated solution are more than welcome! I added
> local variable types for clarify.
>
> def countPopulatedColumns(tsv: RDD[String]) : RDD[(Int, Int)] = {
> //split by tab and zip with index to give column value, column index
> pairs
> val sparse : RDD[(String, Int)] = tsv.flatMap(line =>
> line.split("\t").zipWithIndex)
> //filter out all the zero length values
> val dense : RDD[(String, Int)] = sparse.filter(valueIndex =>
> valueIndex._1.length>0)
> //map each column index to one and do the usual reduction
> dense.map(valueIndex => (valueIndex._2, 1)).reduceByKey(_+_)
> }
>
> Of course, this can be condensed to a single line but it doesn't seem as
> easy to read as the more verbose code above. Write-once code like the
> following is why I never liked Perl....
>
> def cpc(tsv: RDD[String]) : RDD[(Int, Int)] = {
> tsv.flatMap(_.split("\t").zipWithIndex).filter(ci =>
> ci._1.length>0).map(ci => (ci._2, 1)).reduceByKey(_+_)
> }
>
> Thanks,
> Philip
>
>
>
> On 11/8/2013 2:41 PM, Patrick Wendell wrote:
>
>> Hey Tom,
>>
>> reduceByKey will reduce locally on all the nodes, so there won't be
>> any data movement except to combine totals at the end.
>>
>> - Patrick
>>
>> On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek <[email protected]> wrote:
>>
>>> Your example requires each row to be exactly the same length, since
>>> zipped
>>> will truncate to the shorter of its two arguments.
>>>
>>> The second solution is elegant, but reduceByKey involves flying a bunch
>>> of
>>> data around to sort the keys. I suspect it would be a lot slower. But
>>> you
>>> could save yourself from adding up a bunch of zeros:
>>>
>>> val sparseRows = spark.textFile("myfile.tsv").map(line =>
>>> line.split("\t").zipWithIndex.filter(_._1.length>0))
>>> sparseRows.reduce(mergeAdd(_,_))
>>>
>>> You'll have to write a mergeAdd function. This might not be any faster,
>>> but
>>> it does allow variable length rows.
>>>
>>>
>>> On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell <[email protected]>
>>> wrote:
>>>
>>>> It would be a bit more straightforward to write it like this:
>>>>
>>>> val columns = [same as before]
>>>>
>>>> val counts = columns.flatMap(emit (col_id, 0 or 1) for each
>>>> column).reduceByKey(_+ _)
>>>>
>>>> Basically look at each row and emit several records using flatMap.
>>>> Each record has an ID for the column (maybe its index) and a flag for
>>>> whether it's present.
>>>>
>>>> Then you reduce by key to get the per-column count. Then you can
>>>> collect at the end.
>>>>
>>>> - Patrick
>>>>
>>>> On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Spark coders,
>>>>>
>>>>> I wrote my first little Spark job that takes columnar data and counts
>>>>> up
>>>>> how
>>>>> many times each column is populated in an RDD. Here is the code I came
>>>>> up
>>>>> with:
>>>>>
>>>>> //RDD of List[String] corresponding to tab delimited values
>>>>> val columns = spark.textFile("myfile.tsv").map(line =>
>>>>> line.split("\t").toList)
>>>>> //RDD of List[Int] corresponding to populated columns (1 for
>>>>> populated
>>>>> and 0 for not populated)
>>>>> val populatedColumns = columns.map(row => row.map(column =>
>>>>> if(column.length > 0) 1 else 0))
>>>>> //List[Int] contains sums of the 1's in each column
>>>>> val counts = populatedColumns.reduce((row1,row2)
>>>>> =>(row1,row2).zipped.map(_+_))
>>>>>
>>>>> Any thoughts about the fitness of this code snippet? I'm a little
>>>>> annoyed
>>>>> by creating an RDD full of 1's and 0's in the second line. The if
>>>>> statement
>>>>> feels awkward too. I was happy to find the zipped method for the
>>>>> reduce
>>>>> step. Any feedback you might have on how to improve this code is
>>>>> appreciated. I'm a newbie to both Scala and Spark.
>>>>>
>>>>> Thanks,
>>>>> Philip
>>>>>
>>>>>
>>>
>