Patrick, you got me thinking, but I'm sticking to my opinion that reduceByKey should be avoided if possible. I tried some timings:
def time[T](code : => T) = { val t0 = System.nanoTime : Double val res = code val t1 = System.nanoTime : Double println("Elapsed time " + (t1 - t0) / 1000000.0 + " msecs") res } val sparsity=.001 val rows = sc.parallelize(1 to 10000000).mapPartitionsWithIndex( (id, it) => {val rng = new scala.util.Random(id+42); it.map(row => (0 until 10000).filter(i => rng.nextDouble>1-sparsity).map(i => (i,1)) )} ).map(_.toArray).cache val rowsFlat = rows.flatMap(rr => rr).cache rows.count rowsFlat.count val cSums1 = time(rowsFlat.reduceByKey(_+_).collect.sortBy(_._1)) //Elapsed time 725.394917 msecs val cSums2 = time( rows.mapPartitions(it => Array(it.foldLeft(Array.fill(10000)(0))((acc,nn) => {nn.foreach(tt=>acc(tt._1)+=1); acc})).iterator).reduce( (r1,r2) => r1.zip(r2).map(tt => tt._1 + tt._2))) //Elapsed time 206.962364 msecs These are the best times over a small number of runs, but average case showed the same behavior. The merge reduction I had suggested was not even close, which doesn't surprise me much on second thought. At sparsity=.01, the times are 2447 v. 394. Lesson 1: You would care about this in an iterative algorithm, but not in a one-off application. Lesson 2: Shuffle is slow in comparison, even for a small number of elements. Lesson 3: Spark would be even cooler with highly optimized reduce and broadcast. On Fri, Nov 8, 2013 at 7:01 PM, Philip Ogren <philip.og...@oracle.com>wrote: > Thank you for the pointers. I'm not sure I was able to fully understand > either of your suggestions but here is what I came up with. I started with > Tom's code but I think I ended up borrowing from Patrick's suggestion too. > Any thoughts about my updated solution are more than welcome! I added > local variable types for clarify. > > def countPopulatedColumns(tsv: RDD[String]) : RDD[(Int, Int)] = { > //split by tab and zip with index to give column value, column index > pairs > val sparse : RDD[(String, Int)] = tsv.flatMap(line => > line.split("\t").zipWithIndex) > //filter out all the zero length values > val dense : RDD[(String, Int)] = sparse.filter(valueIndex => > valueIndex._1.length>0) > //map each column index to one and do the usual reduction > dense.map(valueIndex => (valueIndex._2, 1)).reduceByKey(_+_) > } > > Of course, this can be condensed to a single line but it doesn't seem as > easy to read as the more verbose code above. Write-once code like the > following is why I never liked Perl.... > > def cpc(tsv: RDD[String]) : RDD[(Int, Int)] = { > tsv.flatMap(_.split("\t").zipWithIndex).filter(ci => > ci._1.length>0).map(ci => (ci._2, 1)).reduceByKey(_+_) > } > > Thanks, > Philip > > > > On 11/8/2013 2:41 PM, Patrick Wendell wrote: > >> Hey Tom, >> >> reduceByKey will reduce locally on all the nodes, so there won't be >> any data movement except to combine totals at the end. >> >> - Patrick >> >> On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek <minnesota...@gmail.com> wrote: >> >>> Your example requires each row to be exactly the same length, since >>> zipped >>> will truncate to the shorter of its two arguments. >>> >>> The second solution is elegant, but reduceByKey involves flying a bunch >>> of >>> data around to sort the keys. I suspect it would be a lot slower. But >>> you >>> could save yourself from adding up a bunch of zeros: >>> >>> val sparseRows = spark.textFile("myfile.tsv").map(line => >>> line.split("\t").zipWithIndex.filter(_._1.length>0)) >>> sparseRows.reduce(mergeAdd(_,_)) >>> >>> You'll have to write a mergeAdd function. This might not be any faster, >>> but >>> it does allow variable length rows. >>> >>> >>> On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell <pwend...@gmail.com> >>> wrote: >>> >>>> It would be a bit more straightforward to write it like this: >>>> >>>> val columns = [same as before] >>>> >>>> val counts = columns.flatMap(emit (col_id, 0 or 1) for each >>>> column).reduceByKey(_+ _) >>>> >>>> Basically look at each row and emit several records using flatMap. >>>> Each record has an ID for the column (maybe its index) and a flag for >>>> whether it's present. >>>> >>>> Then you reduce by key to get the per-column count. Then you can >>>> collect at the end. >>>> >>>> - Patrick >>>> >>>> On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren <philip.og...@oracle.com> >>>> wrote: >>>> >>>>> Hi Spark coders, >>>>> >>>>> I wrote my first little Spark job that takes columnar data and counts >>>>> up >>>>> how >>>>> many times each column is populated in an RDD. Here is the code I came >>>>> up >>>>> with: >>>>> >>>>> //RDD of List[String] corresponding to tab delimited values >>>>> val columns = spark.textFile("myfile.tsv").map(line => >>>>> line.split("\t").toList) >>>>> //RDD of List[Int] corresponding to populated columns (1 for >>>>> populated >>>>> and 0 for not populated) >>>>> val populatedColumns = columns.map(row => row.map(column => >>>>> if(column.length > 0) 1 else 0)) >>>>> //List[Int] contains sums of the 1's in each column >>>>> val counts = populatedColumns.reduce((row1,row2) >>>>> =>(row1,row2).zipped.map(_+_)) >>>>> >>>>> Any thoughts about the fitness of this code snippet? I'm a little >>>>> annoyed >>>>> by creating an RDD full of 1's and 0's in the second line. The if >>>>> statement >>>>> feels awkward too. I was happy to find the zipped method for the >>>>> reduce >>>>> step. Any feedback you might have on how to improve this code is >>>>> appreciated. I'm a newbie to both Scala and Spark. >>>>> >>>>> Thanks, >>>>> Philip >>>>> >>>>> >>> >