Hey Philip, Your code is exactly what I was suggesting. I didn't explain it clearly, when I said "emit XX", I just meant "figure out how to do XX and return the result" there isn't actually a function called 'emit'.
In your case, the correct way to do it was using zipWithIndex... I just couldn't remember off the top of my head how to do that. - Patrick On Sat, Nov 9, 2013 at 4:41 AM, Tom Vacek <[email protected]> wrote: > Patrick, you got me thinking, but I'm sticking to my opinion that > reduceByKey should be avoided if possible. I tried some timings: > > def time[T](code : => T) = { > val t0 = System.nanoTime : Double > val res = code > val t1 = System.nanoTime : Double > println("Elapsed time " + (t1 - t0) / 1000000.0 + " msecs") > res > } > val sparsity=.001 > val rows = sc.parallelize(1 to 10000000).mapPartitionsWithIndex( (id, it) => > {val rng = new scala.util.Random(id+42); it.map(row => (0 until > 10000).filter(i => rng.nextDouble>1-sparsity).map(i => (i,1)) )} > ).map(_.toArray).cache > val rowsFlat = rows.flatMap(rr => rr).cache > > rows.count > rowsFlat.count > > val cSums1 = time(rowsFlat.reduceByKey(_+_).collect.sortBy(_._1)) > //Elapsed time 725.394917 msecs > > val cSums2 = time( rows.mapPartitions(it => > Array(it.foldLeft(Array.fill(10000)(0))((acc,nn) => > {nn.foreach(tt=>acc(tt._1)+=1); acc})).iterator).reduce( (r1,r2) => > r1.zip(r2).map(tt => tt._1 + tt._2))) > //Elapsed time 206.962364 msecs > > These are the best times over a small number of runs, but average case > showed the same behavior. > The merge reduction I had suggested was not even close, which doesn't > surprise me much on second thought. > > At sparsity=.01, the times are 2447 v. 394. > > Lesson 1: You would care about this in an iterative algorithm, but not in a > one-off application. > Lesson 2: Shuffle is slow in comparison, even for a small number of > elements. > Lesson 3: Spark would be even cooler with highly optimized reduce and > broadcast. > > > > On Fri, Nov 8, 2013 at 7:01 PM, Philip Ogren <[email protected]> > wrote: >> >> Thank you for the pointers. I'm not sure I was able to fully understand >> either of your suggestions but here is what I came up with. I started with >> Tom's code but I think I ended up borrowing from Patrick's suggestion too. >> Any thoughts about my updated solution are more than welcome! I added local >> variable types for clarify. >> >> def countPopulatedColumns(tsv: RDD[String]) : RDD[(Int, Int)] = { >> //split by tab and zip with index to give column value, column index >> pairs >> val sparse : RDD[(String, Int)] = tsv.flatMap(line => >> line.split("\t").zipWithIndex) >> //filter out all the zero length values >> val dense : RDD[(String, Int)] = sparse.filter(valueIndex => >> valueIndex._1.length>0) >> //map each column index to one and do the usual reduction >> dense.map(valueIndex => (valueIndex._2, 1)).reduceByKey(_+_) >> } >> >> Of course, this can be condensed to a single line but it doesn't seem as >> easy to read as the more verbose code above. Write-once code like the >> following is why I never liked Perl.... >> >> def cpc(tsv: RDD[String]) : RDD[(Int, Int)] = { >> tsv.flatMap(_.split("\t").zipWithIndex).filter(ci => >> ci._1.length>0).map(ci => (ci._2, 1)).reduceByKey(_+_) >> } >> >> Thanks, >> Philip >> >> >> >> On 11/8/2013 2:41 PM, Patrick Wendell wrote: >>> >>> Hey Tom, >>> >>> reduceByKey will reduce locally on all the nodes, so there won't be >>> any data movement except to combine totals at the end. >>> >>> - Patrick >>> >>> On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek <[email protected]> wrote: >>>> >>>> Your example requires each row to be exactly the same length, since >>>> zipped >>>> will truncate to the shorter of its two arguments. >>>> >>>> The second solution is elegant, but reduceByKey involves flying a bunch >>>> of >>>> data around to sort the keys. I suspect it would be a lot slower. But >>>> you >>>> could save yourself from adding up a bunch of zeros: >>>> >>>> val sparseRows = spark.textFile("myfile.tsv").map(line => >>>> line.split("\t").zipWithIndex.filter(_._1.length>0)) >>>> sparseRows.reduce(mergeAdd(_,_)) >>>> >>>> You'll have to write a mergeAdd function. This might not be any faster, >>>> but >>>> it does allow variable length rows. >>>> >>>> >>>> On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell <[email protected]> >>>> wrote: >>>>> >>>>> It would be a bit more straightforward to write it like this: >>>>> >>>>> val columns = [same as before] >>>>> >>>>> val counts = columns.flatMap(emit (col_id, 0 or 1) for each >>>>> column).reduceByKey(_+ _) >>>>> >>>>> Basically look at each row and emit several records using flatMap. >>>>> Each record has an ID for the column (maybe its index) and a flag for >>>>> whether it's present. >>>>> >>>>> Then you reduce by key to get the per-column count. Then you can >>>>> collect at the end. >>>>> >>>>> - Patrick >>>>> >>>>> On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren <[email protected]> >>>>> wrote: >>>>>> >>>>>> Hi Spark coders, >>>>>> >>>>>> I wrote my first little Spark job that takes columnar data and counts >>>>>> up >>>>>> how >>>>>> many times each column is populated in an RDD. Here is the code I >>>>>> came >>>>>> up >>>>>> with: >>>>>> >>>>>> //RDD of List[String] corresponding to tab delimited values >>>>>> val columns = spark.textFile("myfile.tsv").map(line => >>>>>> line.split("\t").toList) >>>>>> //RDD of List[Int] corresponding to populated columns (1 for >>>>>> populated >>>>>> and 0 for not populated) >>>>>> val populatedColumns = columns.map(row => row.map(column => >>>>>> if(column.length > 0) 1 else 0)) >>>>>> //List[Int] contains sums of the 1's in each column >>>>>> val counts = populatedColumns.reduce((row1,row2) >>>>>> =>(row1,row2).zipped.map(_+_)) >>>>>> >>>>>> Any thoughts about the fitness of this code snippet? I'm a little >>>>>> annoyed >>>>>> by creating an RDD full of 1's and 0's in the second line. The if >>>>>> statement >>>>>> feels awkward too. I was happy to find the zipped method for the >>>>>> reduce >>>>>> step. Any feedback you might have on how to improve this code is >>>>>> appreciated. I'm a newbie to both Scala and Spark. >>>>>> >>>>>> Thanks, >>>>>> Philip >>>>>> >>>> >> >
