Re: code review - counting populated columns
Hey Philip, Your code is exactly what I was suggesting. I didn't explain it clearly, when I said emit XX, I just meant figure out how to do XX and return the result there isn't actually a function called 'emit'. In your case, the correct way to do it was using zipWithIndex... I just couldn't remember off the top of my head how to do that. - Patrick On Sat, Nov 9, 2013 at 4:41 AM, Tom Vacek minnesota...@gmail.com wrote: Patrick, you got me thinking, but I'm sticking to my opinion that reduceByKey should be avoided if possible. I tried some timings: def time[T](code : = T) = { val t0 = System.nanoTime : Double val res = code val t1 = System.nanoTime : Double println(Elapsed time + (t1 - t0) / 100.0 + msecs) res } val sparsity=.001 val rows = sc.parallelize(1 to 1000).mapPartitionsWithIndex( (id, it) = {val rng = new scala.util.Random(id+42); it.map(row = (0 until 1).filter(i = rng.nextDouble1-sparsity).map(i = (i,1)) )} ).map(_.toArray).cache val rowsFlat = rows.flatMap(rr = rr).cache rows.count rowsFlat.count val cSums1 = time(rowsFlat.reduceByKey(_+_).collect.sortBy(_._1)) //Elapsed time 725.394917 msecs val cSums2 = time( rows.mapPartitions(it = Array(it.foldLeft(Array.fill(1)(0))((acc,nn) = {nn.foreach(tt=acc(tt._1)+=1); acc})).iterator).reduce( (r1,r2) = r1.zip(r2).map(tt = tt._1 + tt._2))) //Elapsed time 206.962364 msecs These are the best times over a small number of runs, but average case showed the same behavior. The merge reduction I had suggested was not even close, which doesn't surprise me much on second thought. At sparsity=.01, the times are 2447 v. 394. Lesson 1: You would care about this in an iterative algorithm, but not in a one-off application. Lesson 2: Shuffle is slow in comparison, even for a small number of elements. Lesson 3: Spark would be even cooler with highly optimized reduce and broadcast. On Fri, Nov 8, 2013 at 7:01 PM, Philip Ogren philip.og...@oracle.com wrote: Thank you for the pointers. I'm not sure I was able to fully understand either of your suggestions but here is what I came up with. I started with Tom's code but I think I ended up borrowing from Patrick's suggestion too. Any thoughts about my updated solution are more than welcome! I added local variable types for clarify. def countPopulatedColumns(tsv: RDD[String]) : RDD[(Int, Int)] = { //split by tab and zip with index to give column value, column index pairs val sparse : RDD[(String, Int)] = tsv.flatMap(line = line.split(\t).zipWithIndex) //filter out all the zero length values val dense : RDD[(String, Int)] = sparse.filter(valueIndex = valueIndex._1.length0) //map each column index to one and do the usual reduction dense.map(valueIndex = (valueIndex._2, 1)).reduceByKey(_+_) } Of course, this can be condensed to a single line but it doesn't seem as easy to read as the more verbose code above. Write-once code like the following is why I never liked Perl def cpc(tsv: RDD[String]) : RDD[(Int, Int)] = { tsv.flatMap(_.split(\t).zipWithIndex).filter(ci = ci._1.length0).map(ci = (ci._2, 1)).reduceByKey(_+_) } Thanks, Philip On 11/8/2013 2:41 PM, Patrick Wendell wrote: Hey Tom, reduceByKey will reduce locally on all the nodes, so there won't be any data movement except to combine totals at the end. - Patrick On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek minnesota...@gmail.com wrote: Your example requires each row to be exactly the same length, since zipped will truncate to the shorter of its two arguments. The second solution is elegant, but reduceByKey involves flying a bunch of data around to sort the keys. I suspect it would be a lot slower. But you could save yourself from adding up a bunch of zeros: val sparseRows = spark.textFile(myfile.tsv).map(line = line.split(\t).zipWithIndex.filter(_._1.length0)) sparseRows.reduce(mergeAdd(_,_)) You'll have to write a mergeAdd function. This might not be any faster, but it does allow variable length rows. On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell pwend...@gmail.com wrote: It would be a bit more straightforward to write it like this: val columns = [same as before] val counts = columns.flatMap(emit (col_id, 0 or 1) for each column).reduceByKey(_+ _) Basically look at each row and emit several records using flatMap. Each record has an ID for the column (maybe its index) and a flag for whether it's present. Then you reduce by key to get the per-column count. Then you can collect at the end. - Patrick On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren philip.og...@oracle.com wrote: Hi Spark coders, I wrote my first little Spark job that takes columnar data and counts up how many times each column is populated in an RDD. Here is the code I came up with: //RDD of List[String] corresponding to tab delimited values val columns
code review - counting populated columns
Hi Spark coders, I wrote my first little Spark job that takes columnar data and counts up how many times each column is populated in an RDD. Here is the code I came up with: //RDD of List[String] corresponding to tab delimited values val columns = spark.textFile(myfile.tsv).map(line = line.split(\t).toList) //RDD of List[Int] corresponding to populated columns (1 for populated and 0 for not populated) val populatedColumns = columns.map(row = row.map(column = if(column.length 0) 1 else 0)) //List[Int] contains sums of the 1's in each column val counts = populatedColumns.reduce((row1,row2) =(row1,row2).zipped.map(_+_)) Any thoughts about the fitness of this code snippet? I'm a little annoyed by creating an RDD full of 1's and 0's in the second line. The if statement feels awkward too. I was happy to find the zipped method for the reduce step. Any feedback you might have on how to improve this code is appreciated. I'm a newbie to both Scala and Spark. Thanks, Philip
Re: code review - counting populated columns
Where does 'emit' come from? I don't see it in the Scala or Spark apidocs (though I don't feel very deft at searching either!) Thanks, Philip On 11/8/2013 2:23 PM, Patrick Wendell wrote: It would be a bit more straightforward to write it like this: val columns = [same as before] val counts = columns.flatMap(emit (col_id, 0 or 1) for each column).reduceByKey(_+ _) Basically look at each row and emit several records using flatMap. Each record has an ID for the column (maybe its index) and a flag for whether it's present. Then you reduce by key to get the per-column count. Then you can collect at the end. - Patrick On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren philip.og...@oracle.com wrote: Hi Spark coders, I wrote my first little Spark job that takes columnar data and counts up how many times each column is populated in an RDD. Here is the code I came up with: //RDD of List[String] corresponding to tab delimited values val columns = spark.textFile(myfile.tsv).map(line = line.split(\t).toList) //RDD of List[Int] corresponding to populated columns (1 for populated and 0 for not populated) val populatedColumns = columns.map(row = row.map(column = if(column.length 0) 1 else 0)) //List[Int] contains sums of the 1's in each column val counts = populatedColumns.reduce((row1,row2) =(row1,row2).zipped.map(_+_)) Any thoughts about the fitness of this code snippet? I'm a little annoyed by creating an RDD full of 1's and 0's in the second line. The if statement feels awkward too. I was happy to find the zipped method for the reduce step. Any feedback you might have on how to improve this code is appreciated. I'm a newbie to both Scala and Spark. Thanks, Philip
Re: code review - counting populated columns
Your example requires each row to be exactly the same length, since zipped will truncate to the shorter of its two arguments. The second solution is elegant, but reduceByKey involves flying a bunch of data around to sort the keys. I suspect it would be a lot slower. But you could save yourself from adding up a bunch of zeros: val sparseRows = spark.textFile(myfile.tsv).map(line = line.split(\t).zipWithIndex.filter(_._1.length0)) sparseRows.reduce(mergeAdd(_,_)) You'll have to write a mergeAdd function. This might not be any faster, but it does allow variable length rows. On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell pwend...@gmail.com wrote: It would be a bit more straightforward to write it like this: val columns = [same as before] val counts = columns.flatMap(emit (col_id, 0 or 1) for each column).reduceByKey(_+ _) Basically look at each row and emit several records using flatMap. Each record has an ID for the column (maybe its index) and a flag for whether it's present. Then you reduce by key to get the per-column count. Then you can collect at the end. - Patrick On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren philip.og...@oracle.com wrote: Hi Spark coders, I wrote my first little Spark job that takes columnar data and counts up how many times each column is populated in an RDD. Here is the code I came up with: //RDD of List[String] corresponding to tab delimited values val columns = spark.textFile(myfile.tsv).map(line = line.split(\t).toList) //RDD of List[Int] corresponding to populated columns (1 for populated and 0 for not populated) val populatedColumns = columns.map(row = row.map(column = if(column.length 0) 1 else 0)) //List[Int] contains sums of the 1's in each column val counts = populatedColumns.reduce((row1,row2) =(row1,row2).zipped.map(_+_)) Any thoughts about the fitness of this code snippet? I'm a little annoyed by creating an RDD full of 1's and 0's in the second line. The if statement feels awkward too. I was happy to find the zipped method for the reduce step. Any feedback you might have on how to improve this code is appreciated. I'm a newbie to both Scala and Spark. Thanks, Philip
Re: code review - counting populated columns
Messed up. Should be val sparseRows = spark.textFile(myfile.tsv).map(line = line.split(\t).zipWithIndex.flatMap( tt = if(tt._1.length0) (tt._2, 1) ) Then reduce with a mergeAdd. On Fri, Nov 8, 2013 at 3:35 PM, Tom Vacek minnesota...@gmail.com wrote: Your example requires each row to be exactly the same length, since zipped will truncate to the shorter of its two arguments. The second solution is elegant, but reduceByKey involves flying a bunch of data around to sort the keys. I suspect it would be a lot slower. But you could save yourself from adding up a bunch of zeros: val sparseRows = spark.textFile(myfile.tsv).map(line = line.split(\t).zipWithIndex.filter(_._1.length0)) sparseRows.reduce(mergeAdd(_,_)) You'll have to write a mergeAdd function. This might not be any faster, but it does allow variable length rows. On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell pwend...@gmail.comwrote: It would be a bit more straightforward to write it like this: val columns = [same as before] val counts = columns.flatMap(emit (col_id, 0 or 1) for each column).reduceByKey(_+ _) Basically look at each row and emit several records using flatMap. Each record has an ID for the column (maybe its index) and a flag for whether it's present. Then you reduce by key to get the per-column count. Then you can collect at the end. - Patrick On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren philip.og...@oracle.com wrote: Hi Spark coders, I wrote my first little Spark job that takes columnar data and counts up how many times each column is populated in an RDD. Here is the code I came up with: //RDD of List[String] corresponding to tab delimited values val columns = spark.textFile(myfile.tsv).map(line = line.split(\t).toList) //RDD of List[Int] corresponding to populated columns (1 for populated and 0 for not populated) val populatedColumns = columns.map(row = row.map(column = if(column.length 0) 1 else 0)) //List[Int] contains sums of the 1's in each column val counts = populatedColumns.reduce((row1,row2) =(row1,row2).zipped.map(_+_)) Any thoughts about the fitness of this code snippet? I'm a little annoyed by creating an RDD full of 1's and 0's in the second line. The if statement feels awkward too. I was happy to find the zipped method for the reduce step. Any feedback you might have on how to improve this code is appreciated. I'm a newbie to both Scala and Spark. Thanks, Philip
Re: code review - counting populated columns
Hey Tom, reduceByKey will reduce locally on all the nodes, so there won't be any data movement except to combine totals at the end. - Patrick On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek minnesota...@gmail.com wrote: Your example requires each row to be exactly the same length, since zipped will truncate to the shorter of its two arguments. The second solution is elegant, but reduceByKey involves flying a bunch of data around to sort the keys. I suspect it would be a lot slower. But you could save yourself from adding up a bunch of zeros: val sparseRows = spark.textFile(myfile.tsv).map(line = line.split(\t).zipWithIndex.filter(_._1.length0)) sparseRows.reduce(mergeAdd(_,_)) You'll have to write a mergeAdd function. This might not be any faster, but it does allow variable length rows. On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell pwend...@gmail.com wrote: It would be a bit more straightforward to write it like this: val columns = [same as before] val counts = columns.flatMap(emit (col_id, 0 or 1) for each column).reduceByKey(_+ _) Basically look at each row and emit several records using flatMap. Each record has an ID for the column (maybe its index) and a flag for whether it's present. Then you reduce by key to get the per-column count. Then you can collect at the end. - Patrick On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren philip.og...@oracle.com wrote: Hi Spark coders, I wrote my first little Spark job that takes columnar data and counts up how many times each column is populated in an RDD. Here is the code I came up with: //RDD of List[String] corresponding to tab delimited values val columns = spark.textFile(myfile.tsv).map(line = line.split(\t).toList) //RDD of List[Int] corresponding to populated columns (1 for populated and 0 for not populated) val populatedColumns = columns.map(row = row.map(column = if(column.length 0) 1 else 0)) //List[Int] contains sums of the 1's in each column val counts = populatedColumns.reduce((row1,row2) =(row1,row2).zipped.map(_+_)) Any thoughts about the fitness of this code snippet? I'm a little annoyed by creating an RDD full of 1's and 0's in the second line. The if statement feels awkward too. I was happy to find the zipped method for the reduce step. Any feedback you might have on how to improve this code is appreciated. I'm a newbie to both Scala and Spark. Thanks, Philip
Re: code review - counting populated columns
Thank you for the pointers. I'm not sure I was able to fully understand either of your suggestions but here is what I came up with. I started with Tom's code but I think I ended up borrowing from Patrick's suggestion too. Any thoughts about my updated solution are more than welcome! I added local variable types for clarify. def countPopulatedColumns(tsv: RDD[String]) : RDD[(Int, Int)] = { //split by tab and zip with index to give column value, column index pairs val sparse : RDD[(String, Int)] = tsv.flatMap(line = line.split(\t).zipWithIndex) //filter out all the zero length values val dense : RDD[(String, Int)] = sparse.filter(valueIndex = valueIndex._1.length0) //map each column index to one and do the usual reduction dense.map(valueIndex = (valueIndex._2, 1)).reduceByKey(_+_) } Of course, this can be condensed to a single line but it doesn't seem as easy to read as the more verbose code above. Write-once code like the following is why I never liked Perl def cpc(tsv: RDD[String]) : RDD[(Int, Int)] = { tsv.flatMap(_.split(\t).zipWithIndex).filter(ci = ci._1.length0).map(ci = (ci._2, 1)).reduceByKey(_+_) } Thanks, Philip On 11/8/2013 2:41 PM, Patrick Wendell wrote: Hey Tom, reduceByKey will reduce locally on all the nodes, so there won't be any data movement except to combine totals at the end. - Patrick On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek minnesota...@gmail.com wrote: Your example requires each row to be exactly the same length, since zipped will truncate to the shorter of its two arguments. The second solution is elegant, but reduceByKey involves flying a bunch of data around to sort the keys. I suspect it would be a lot slower. But you could save yourself from adding up a bunch of zeros: val sparseRows = spark.textFile(myfile.tsv).map(line = line.split(\t).zipWithIndex.filter(_._1.length0)) sparseRows.reduce(mergeAdd(_,_)) You'll have to write a mergeAdd function. This might not be any faster, but it does allow variable length rows. On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell pwend...@gmail.com wrote: It would be a bit more straightforward to write it like this: val columns = [same as before] val counts = columns.flatMap(emit (col_id, 0 or 1) for each column).reduceByKey(_+ _) Basically look at each row and emit several records using flatMap. Each record has an ID for the column (maybe its index) and a flag for whether it's present. Then you reduce by key to get the per-column count. Then you can collect at the end. - Patrick On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren philip.og...@oracle.com wrote: Hi Spark coders, I wrote my first little Spark job that takes columnar data and counts up how many times each column is populated in an RDD. Here is the code I came up with: //RDD of List[String] corresponding to tab delimited values val columns = spark.textFile(myfile.tsv).map(line = line.split(\t).toList) //RDD of List[Int] corresponding to populated columns (1 for populated and 0 for not populated) val populatedColumns = columns.map(row = row.map(column = if(column.length 0) 1 else 0)) //List[Int] contains sums of the 1's in each column val counts = populatedColumns.reduce((row1,row2) =(row1,row2).zipped.map(_+_)) Any thoughts about the fitness of this code snippet? I'm a little annoyed by creating an RDD full of 1's and 0's in the second line. The if statement feels awkward too. I was happy to find the zipped method for the reduce step. Any feedback you might have on how to improve this code is appreciated. I'm a newbie to both Scala and Spark. Thanks, Philip
Re: code review - counting populated columns
Patrick, you got me thinking, but I'm sticking to my opinion that reduceByKey should be avoided if possible. I tried some timings: def time[T](code : = T) = { val t0 = System.nanoTime : Double val res = code val t1 = System.nanoTime : Double println(Elapsed time + (t1 - t0) / 100.0 + msecs) res } val sparsity=.001 val rows = sc.parallelize(1 to 1000).mapPartitionsWithIndex( (id, it) = {val rng = new scala.util.Random(id+42); it.map(row = (0 until 1).filter(i = rng.nextDouble1-sparsity).map(i = (i,1)) )} ).map(_.toArray).cache val rowsFlat = rows.flatMap(rr = rr).cache rows.count rowsFlat.count val cSums1 = time(rowsFlat.reduceByKey(_+_).collect.sortBy(_._1)) //Elapsed time 725.394917 msecs val cSums2 = time( rows.mapPartitions(it = Array(it.foldLeft(Array.fill(1)(0))((acc,nn) = {nn.foreach(tt=acc(tt._1)+=1); acc})).iterator).reduce( (r1,r2) = r1.zip(r2).map(tt = tt._1 + tt._2))) //Elapsed time 206.962364 msecs These are the best times over a small number of runs, but average case showed the same behavior. The merge reduction I had suggested was not even close, which doesn't surprise me much on second thought. At sparsity=.01, the times are 2447 v. 394. Lesson 1: You would care about this in an iterative algorithm, but not in a one-off application. Lesson 2: Shuffle is slow in comparison, even for a small number of elements. Lesson 3: Spark would be even cooler with highly optimized reduce and broadcast. On Fri, Nov 8, 2013 at 7:01 PM, Philip Ogren philip.og...@oracle.comwrote: Thank you for the pointers. I'm not sure I was able to fully understand either of your suggestions but here is what I came up with. I started with Tom's code but I think I ended up borrowing from Patrick's suggestion too. Any thoughts about my updated solution are more than welcome! I added local variable types for clarify. def countPopulatedColumns(tsv: RDD[String]) : RDD[(Int, Int)] = { //split by tab and zip with index to give column value, column index pairs val sparse : RDD[(String, Int)] = tsv.flatMap(line = line.split(\t).zipWithIndex) //filter out all the zero length values val dense : RDD[(String, Int)] = sparse.filter(valueIndex = valueIndex._1.length0) //map each column index to one and do the usual reduction dense.map(valueIndex = (valueIndex._2, 1)).reduceByKey(_+_) } Of course, this can be condensed to a single line but it doesn't seem as easy to read as the more verbose code above. Write-once code like the following is why I never liked Perl def cpc(tsv: RDD[String]) : RDD[(Int, Int)] = { tsv.flatMap(_.split(\t).zipWithIndex).filter(ci = ci._1.length0).map(ci = (ci._2, 1)).reduceByKey(_+_) } Thanks, Philip On 11/8/2013 2:41 PM, Patrick Wendell wrote: Hey Tom, reduceByKey will reduce locally on all the nodes, so there won't be any data movement except to combine totals at the end. - Patrick On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek minnesota...@gmail.com wrote: Your example requires each row to be exactly the same length, since zipped will truncate to the shorter of its two arguments. The second solution is elegant, but reduceByKey involves flying a bunch of data around to sort the keys. I suspect it would be a lot slower. But you could save yourself from adding up a bunch of zeros: val sparseRows = spark.textFile(myfile.tsv).map(line = line.split(\t).zipWithIndex.filter(_._1.length0)) sparseRows.reduce(mergeAdd(_,_)) You'll have to write a mergeAdd function. This might not be any faster, but it does allow variable length rows. On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell pwend...@gmail.com wrote: It would be a bit more straightforward to write it like this: val columns = [same as before] val counts = columns.flatMap(emit (col_id, 0 or 1) for each column).reduceByKey(_+ _) Basically look at each row and emit several records using flatMap. Each record has an ID for the column (maybe its index) and a flag for whether it's present. Then you reduce by key to get the per-column count. Then you can collect at the end. - Patrick On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren philip.og...@oracle.com wrote: Hi Spark coders, I wrote my first little Spark job that takes columnar data and counts up how many times each column is populated in an RDD. Here is the code I came up with: //RDD of List[String] corresponding to tab delimited values val columns = spark.textFile(myfile.tsv).map(line = line.split(\t).toList) //RDD of List[Int] corresponding to populated columns (1 for populated and 0 for not populated) val populatedColumns = columns.map(row = row.map(column = if(column.length 0) 1 else 0)) //List[Int] contains sums of the 1's in each column val counts = populatedColumns.reduce((row1,row2) =(row1,row2).zipped.map(_+_)) Any thoughts about the fitness of this code snippet?