Re: code review - counting populated columns

2013-11-10 Thread Patrick Wendell
Hey Philip,

Your code is exactly what I was suggesting. I didn't explain it
clearly, when I said emit XX, I just meant figure out how to do XX
and return the result there isn't actually a function called 'emit'.

In your case, the correct way to do it was using zipWithIndex... I
just couldn't remember off the top of my head how to do that.

- Patrick

On Sat, Nov 9, 2013 at 4:41 AM, Tom Vacek minnesota...@gmail.com wrote:
 Patrick, you got me thinking, but I'm sticking to my opinion that
 reduceByKey should be avoided if possible.  I tried some timings:

 def time[T](code : = T) =  {
 val t0 = System.nanoTime : Double
 val res = code
 val t1 = System.nanoTime : Double
 println(Elapsed time  + (t1 - t0) / 100.0 +  msecs)
 res
 }
 val sparsity=.001
 val rows = sc.parallelize(1 to 1000).mapPartitionsWithIndex( (id, it) =
 {val rng = new scala.util.Random(id+42); it.map(row = (0 until
 1).filter(i = rng.nextDouble1-sparsity).map(i = (i,1)) )}
 ).map(_.toArray).cache
 val rowsFlat = rows.flatMap(rr = rr).cache

 rows.count
 rowsFlat.count

 val cSums1 = time(rowsFlat.reduceByKey(_+_).collect.sortBy(_._1))
 //Elapsed time 725.394917 msecs

 val cSums2 = time( rows.mapPartitions(it =
 Array(it.foldLeft(Array.fill(1)(0))((acc,nn) =
 {nn.foreach(tt=acc(tt._1)+=1); acc})).iterator).reduce( (r1,r2) =
 r1.zip(r2).map(tt = tt._1 + tt._2)))
 //Elapsed time 206.962364 msecs

 These are the best times over a small number of runs, but average case
 showed the same behavior.
 The merge reduction I had suggested was not even close, which doesn't
 surprise me much on second thought.

 At sparsity=.01, the times are 2447 v. 394.

 Lesson 1: You would care about this in an iterative algorithm, but not in a
 one-off application.
 Lesson 2: Shuffle is slow in comparison, even for a small number of
 elements.
 Lesson 3: Spark would be even cooler with highly optimized reduce and
 broadcast.



 On Fri, Nov 8, 2013 at 7:01 PM, Philip Ogren philip.og...@oracle.com
 wrote:

 Thank you for the pointers.  I'm not sure I was able to fully understand
 either of your suggestions but here is what I came up with.  I started with
 Tom's code but I think I ended up borrowing from Patrick's suggestion too.
 Any thoughts about my updated solution are more than welcome!  I added local
 variable types for clarify.

   def countPopulatedColumns(tsv: RDD[String]) : RDD[(Int, Int)] = {
 //split by tab and zip with index to give column value, column index
 pairs
 val sparse : RDD[(String, Int)] = tsv.flatMap(line =
 line.split(\t).zipWithIndex)
 //filter out all the zero length values
 val dense : RDD[(String, Int)] = sparse.filter(valueIndex =
 valueIndex._1.length0)
 //map each column index to one and do the usual reduction
 dense.map(valueIndex = (valueIndex._2, 1)).reduceByKey(_+_)
   }

 Of course, this can be condensed to a single line but it doesn't seem as
 easy to read as the more verbose code above.  Write-once code like the
 following is why I never liked Perl

   def cpc(tsv: RDD[String]) : RDD[(Int, Int)] = {
 tsv.flatMap(_.split(\t).zipWithIndex).filter(ci =
 ci._1.length0).map(ci = (ci._2, 1)).reduceByKey(_+_)
   }

 Thanks,
 Philip



 On 11/8/2013 2:41 PM, Patrick Wendell wrote:

 Hey Tom,

 reduceByKey will reduce locally on all the nodes, so there won't be
 any data movement except to combine totals at the end.

 - Patrick

 On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek minnesota...@gmail.com wrote:

 Your example requires each row to be exactly the same length, since
 zipped
 will truncate to the shorter of its two arguments.

 The second solution is elegant, but reduceByKey involves flying a bunch
 of
 data around to sort the keys.  I suspect it would be a lot slower.  But
 you
 could save yourself from adding up a bunch of zeros:

   val sparseRows = spark.textFile(myfile.tsv).map(line =
 line.split(\t).zipWithIndex.filter(_._1.length0))
 sparseRows.reduce(mergeAdd(_,_))

 You'll have to write a mergeAdd function.  This might not be any faster,
 but
 it does allow variable length rows.


 On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell pwend...@gmail.com
 wrote:

 It would be a bit more straightforward to write it like this:

 val columns = [same as before]

 val counts = columns.flatMap(emit (col_id, 0 or 1) for each
 column).reduceByKey(_+ _)

 Basically look at each row and emit several records using flatMap.
 Each record has an ID for the column (maybe its index) and a flag for
 whether it's present.

 Then you reduce by key to get the per-column count. Then you can
 collect at the end.

 - Patrick

 On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren philip.og...@oracle.com
 wrote:

 Hi Spark coders,

 I wrote my first little Spark job that takes columnar data and counts
 up
 how
 many times each column is populated in an RDD.  Here is the code I
 came
 up
 with:

  //RDD of List[String] corresponding to tab delimited values
  val columns 

code review - counting populated columns

2013-11-08 Thread Philip Ogren

Hi Spark coders,

I wrote my first little Spark job that takes columnar data and counts up 
how many times each column is populated in an RDD.  Here is the code I 
came up with:


//RDD of List[String] corresponding to tab delimited values
val columns = spark.textFile(myfile.tsv).map(line = 
line.split(\t).toList)
//RDD of List[Int] corresponding to populated columns (1 for 
populated and 0 for not populated)
val populatedColumns = columns.map(row = row.map(column = 
if(column.length  0) 1 else 0))

//List[Int] contains sums of the 1's in each column
val counts = populatedColumns.reduce((row1,row2) 
=(row1,row2).zipped.map(_+_))


Any thoughts about the fitness of this code snippet?  I'm a little 
annoyed by creating an RDD full of 1's and 0's in the second line.  The 
if statement feels awkward too.  I was happy to find the zipped method 
for the reduce step.  Any feedback you might have on how to improve this 
code is appreciated.  I'm a newbie to both Scala and Spark.


Thanks,
Philip



Re: code review - counting populated columns

2013-11-08 Thread Philip Ogren
Where does 'emit' come from?  I don't see it in the Scala or Spark 
apidocs (though I don't feel very deft at searching either!)


Thanks,
Philip

On 11/8/2013 2:23 PM, Patrick Wendell wrote:

It would be a bit more straightforward to write it like this:

val columns = [same as before]

val counts = columns.flatMap(emit (col_id, 0 or 1) for each
column).reduceByKey(_+ _)

Basically look at each row and emit several records using flatMap.
Each record has an ID for the column (maybe its index) and a flag for
whether it's present.

Then you reduce by key to get the per-column count. Then you can
collect at the end.

- Patrick

On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren philip.og...@oracle.com wrote:

Hi Spark coders,

I wrote my first little Spark job that takes columnar data and counts up how
many times each column is populated in an RDD.  Here is the code I came up
with:

 //RDD of List[String] corresponding to tab delimited values
 val columns = spark.textFile(myfile.tsv).map(line =
line.split(\t).toList)
 //RDD of List[Int] corresponding to populated columns (1 for populated
and 0 for not populated)
 val populatedColumns = columns.map(row = row.map(column =
if(column.length  0) 1 else 0))
 //List[Int] contains sums of the 1's in each column
 val counts = populatedColumns.reduce((row1,row2)
=(row1,row2).zipped.map(_+_))

Any thoughts about the fitness of this code snippet?  I'm a little annoyed
by creating an RDD full of 1's and 0's in the second line.  The if statement
feels awkward too.  I was happy to find the zipped method for the reduce
step.  Any feedback you might have on how to improve this code is
appreciated.  I'm a newbie to both Scala and Spark.

Thanks,
Philip





Re: code review - counting populated columns

2013-11-08 Thread Tom Vacek
Your example requires each row to be exactly the same length, since zipped
will truncate to the shorter of its two arguments.

The second solution is elegant, but reduceByKey involves flying a bunch of
data around to sort the keys.  I suspect it would be a lot slower.  But you
could save yourself from adding up a bunch of zeros:

 val sparseRows = spark.textFile(myfile.tsv).map(line =
line.split(\t).zipWithIndex.filter(_._1.length0))
sparseRows.reduce(mergeAdd(_,_))

You'll have to write a mergeAdd function.  This might not be any faster,
but it does allow variable length rows.


On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell pwend...@gmail.com wrote:

 It would be a bit more straightforward to write it like this:

 val columns = [same as before]

 val counts = columns.flatMap(emit (col_id, 0 or 1) for each
 column).reduceByKey(_+ _)

 Basically look at each row and emit several records using flatMap.
 Each record has an ID for the column (maybe its index) and a flag for
 whether it's present.

 Then you reduce by key to get the per-column count. Then you can
 collect at the end.

 - Patrick

 On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren philip.og...@oracle.com
 wrote:
  Hi Spark coders,
 
  I wrote my first little Spark job that takes columnar data and counts up
 how
  many times each column is populated in an RDD.  Here is the code I came
 up
  with:
 
  //RDD of List[String] corresponding to tab delimited values
  val columns = spark.textFile(myfile.tsv).map(line =
  line.split(\t).toList)
  //RDD of List[Int] corresponding to populated columns (1 for
 populated
  and 0 for not populated)
  val populatedColumns = columns.map(row = row.map(column =
  if(column.length  0) 1 else 0))
  //List[Int] contains sums of the 1's in each column
  val counts = populatedColumns.reduce((row1,row2)
  =(row1,row2).zipped.map(_+_))
 
  Any thoughts about the fitness of this code snippet?  I'm a little
 annoyed
  by creating an RDD full of 1's and 0's in the second line.  The if
 statement
  feels awkward too.  I was happy to find the zipped method for the reduce
  step.  Any feedback you might have on how to improve this code is
  appreciated.  I'm a newbie to both Scala and Spark.
 
  Thanks,
  Philip
 



Re: code review - counting populated columns

2013-11-08 Thread Tom Vacek
Messed up.  Should be
 val sparseRows = spark.textFile(myfile.tsv).map(line =
line.split(\t).zipWithIndex.flatMap( tt = if(tt._1.length0) (tt._2, 1) )
Then reduce with a mergeAdd.


On Fri, Nov 8, 2013 at 3:35 PM, Tom Vacek minnesota...@gmail.com wrote:

 Your example requires each row to be exactly the same length, since zipped
 will truncate to the shorter of its two arguments.

 The second solution is elegant, but reduceByKey involves flying a bunch of
 data around to sort the keys.  I suspect it would be a lot slower.  But you
 could save yourself from adding up a bunch of zeros:

  val sparseRows = spark.textFile(myfile.tsv).map(line =
 line.split(\t).zipWithIndex.filter(_._1.length0))
 sparseRows.reduce(mergeAdd(_,_))

 You'll have to write a mergeAdd function.  This might not be any faster,
 but it does allow variable length rows.


 On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell pwend...@gmail.comwrote:

 It would be a bit more straightforward to write it like this:

 val columns = [same as before]

 val counts = columns.flatMap(emit (col_id, 0 or 1) for each
 column).reduceByKey(_+ _)

 Basically look at each row and emit several records using flatMap.
 Each record has an ID for the column (maybe its index) and a flag for
 whether it's present.

 Then you reduce by key to get the per-column count. Then you can
 collect at the end.

 - Patrick

 On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren philip.og...@oracle.com
 wrote:
  Hi Spark coders,
 
  I wrote my first little Spark job that takes columnar data and counts
 up how
  many times each column is populated in an RDD.  Here is the code I came
 up
  with:
 
  //RDD of List[String] corresponding to tab delimited values
  val columns = spark.textFile(myfile.tsv).map(line =
  line.split(\t).toList)
  //RDD of List[Int] corresponding to populated columns (1 for
 populated
  and 0 for not populated)
  val populatedColumns = columns.map(row = row.map(column =
  if(column.length  0) 1 else 0))
  //List[Int] contains sums of the 1's in each column
  val counts = populatedColumns.reduce((row1,row2)
  =(row1,row2).zipped.map(_+_))
 
  Any thoughts about the fitness of this code snippet?  I'm a little
 annoyed
  by creating an RDD full of 1's and 0's in the second line.  The if
 statement
  feels awkward too.  I was happy to find the zipped method for the reduce
  step.  Any feedback you might have on how to improve this code is
  appreciated.  I'm a newbie to both Scala and Spark.
 
  Thanks,
  Philip
 





Re: code review - counting populated columns

2013-11-08 Thread Patrick Wendell
Hey Tom,

reduceByKey will reduce locally on all the nodes, so there won't be
any data movement except to combine totals at the end.

- Patrick

On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek minnesota...@gmail.com wrote:
 Your example requires each row to be exactly the same length, since zipped
 will truncate to the shorter of its two arguments.

 The second solution is elegant, but reduceByKey involves flying a bunch of
 data around to sort the keys.  I suspect it would be a lot slower.  But you
 could save yourself from adding up a bunch of zeros:

  val sparseRows = spark.textFile(myfile.tsv).map(line =
 line.split(\t).zipWithIndex.filter(_._1.length0))
 sparseRows.reduce(mergeAdd(_,_))

 You'll have to write a mergeAdd function.  This might not be any faster, but
 it does allow variable length rows.


 On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell pwend...@gmail.com wrote:

 It would be a bit more straightforward to write it like this:

 val columns = [same as before]

 val counts = columns.flatMap(emit (col_id, 0 or 1) for each
 column).reduceByKey(_+ _)

 Basically look at each row and emit several records using flatMap.
 Each record has an ID for the column (maybe its index) and a flag for
 whether it's present.

 Then you reduce by key to get the per-column count. Then you can
 collect at the end.

 - Patrick

 On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren philip.og...@oracle.com
 wrote:
  Hi Spark coders,
 
  I wrote my first little Spark job that takes columnar data and counts up
  how
  many times each column is populated in an RDD.  Here is the code I came
  up
  with:
 
  //RDD of List[String] corresponding to tab delimited values
  val columns = spark.textFile(myfile.tsv).map(line =
  line.split(\t).toList)
  //RDD of List[Int] corresponding to populated columns (1 for
  populated
  and 0 for not populated)
  val populatedColumns = columns.map(row = row.map(column =
  if(column.length  0) 1 else 0))
  //List[Int] contains sums of the 1's in each column
  val counts = populatedColumns.reduce((row1,row2)
  =(row1,row2).zipped.map(_+_))
 
  Any thoughts about the fitness of this code snippet?  I'm a little
  annoyed
  by creating an RDD full of 1's and 0's in the second line.  The if
  statement
  feels awkward too.  I was happy to find the zipped method for the reduce
  step.  Any feedback you might have on how to improve this code is
  appreciated.  I'm a newbie to both Scala and Spark.
 
  Thanks,
  Philip
 




Re: code review - counting populated columns

2013-11-08 Thread Philip Ogren
Thank you for the pointers.  I'm not sure I was able to fully understand 
either of your suggestions but here is what I came up with.  I started 
with Tom's code but I think I ended up borrowing from Patrick's 
suggestion too.  Any thoughts about my updated solution are more than 
welcome!  I added local variable types for clarify.


  def countPopulatedColumns(tsv: RDD[String]) : RDD[(Int, Int)] = {
//split by tab and zip with index to give column value, column 
index pairs
val sparse : RDD[(String, Int)] = tsv.flatMap(line = 
line.split(\t).zipWithIndex)

//filter out all the zero length values
val dense : RDD[(String, Int)] = sparse.filter(valueIndex = 
valueIndex._1.length0)

//map each column index to one and do the usual reduction
dense.map(valueIndex = (valueIndex._2, 1)).reduceByKey(_+_)
  }

Of course, this can be condensed to a single line but it doesn't seem as 
easy to read as the more verbose code above.  Write-once code like the 
following is why I never liked Perl


  def cpc(tsv: RDD[String]) : RDD[(Int, Int)] = {
tsv.flatMap(_.split(\t).zipWithIndex).filter(ci = 
ci._1.length0).map(ci = (ci._2, 1)).reduceByKey(_+_)

  }

Thanks,
Philip


On 11/8/2013 2:41 PM, Patrick Wendell wrote:

Hey Tom,

reduceByKey will reduce locally on all the nodes, so there won't be
any data movement except to combine totals at the end.

- Patrick

On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek minnesota...@gmail.com wrote:

Your example requires each row to be exactly the same length, since zipped
will truncate to the shorter of its two arguments.

The second solution is elegant, but reduceByKey involves flying a bunch of
data around to sort the keys.  I suspect it would be a lot slower.  But you
could save yourself from adding up a bunch of zeros:

  val sparseRows = spark.textFile(myfile.tsv).map(line =
line.split(\t).zipWithIndex.filter(_._1.length0))
sparseRows.reduce(mergeAdd(_,_))

You'll have to write a mergeAdd function.  This might not be any faster, but
it does allow variable length rows.


On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell pwend...@gmail.com wrote:

It would be a bit more straightforward to write it like this:

val columns = [same as before]

val counts = columns.flatMap(emit (col_id, 0 or 1) for each
column).reduceByKey(_+ _)

Basically look at each row and emit several records using flatMap.
Each record has an ID for the column (maybe its index) and a flag for
whether it's present.

Then you reduce by key to get the per-column count. Then you can
collect at the end.

- Patrick

On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren philip.og...@oracle.com
wrote:

Hi Spark coders,

I wrote my first little Spark job that takes columnar data and counts up
how
many times each column is populated in an RDD.  Here is the code I came
up
with:

 //RDD of List[String] corresponding to tab delimited values
 val columns = spark.textFile(myfile.tsv).map(line =
line.split(\t).toList)
 //RDD of List[Int] corresponding to populated columns (1 for
populated
and 0 for not populated)
 val populatedColumns = columns.map(row = row.map(column =
if(column.length  0) 1 else 0))
 //List[Int] contains sums of the 1's in each column
 val counts = populatedColumns.reduce((row1,row2)
=(row1,row2).zipped.map(_+_))

Any thoughts about the fitness of this code snippet?  I'm a little
annoyed
by creating an RDD full of 1's and 0's in the second line.  The if
statement
feels awkward too.  I was happy to find the zipped method for the reduce
step.  Any feedback you might have on how to improve this code is
appreciated.  I'm a newbie to both Scala and Spark.

Thanks,
Philip







Re: code review - counting populated columns

2013-11-08 Thread Tom Vacek
Patrick, you got me thinking, but I'm sticking to my opinion that
reduceByKey should be avoided if possible.  I tried some timings:

def time[T](code : = T) =  {
val t0 = System.nanoTime : Double
val res = code
val t1 = System.nanoTime : Double
println(Elapsed time  + (t1 - t0) / 100.0 +  msecs)
res
}
val sparsity=.001
val rows = sc.parallelize(1 to 1000).mapPartitionsWithIndex( (id, it)
= {val rng = new scala.util.Random(id+42); it.map(row = (0 until
1).filter(i = rng.nextDouble1-sparsity).map(i = (i,1)) )}
).map(_.toArray).cache
val rowsFlat = rows.flatMap(rr = rr).cache

rows.count
rowsFlat.count

val cSums1 = time(rowsFlat.reduceByKey(_+_).collect.sortBy(_._1))
//Elapsed time 725.394917 msecs

val cSums2 = time( rows.mapPartitions(it =
Array(it.foldLeft(Array.fill(1)(0))((acc,nn) =
{nn.foreach(tt=acc(tt._1)+=1); acc})).iterator).reduce( (r1,r2) =
r1.zip(r2).map(tt = tt._1 + tt._2)))
//Elapsed time 206.962364 msecs

These are the best times over a small number of runs, but average case
showed the same behavior.
The merge reduction I had suggested was not even close, which doesn't
surprise me much on second thought.

At sparsity=.01, the times are 2447 v. 394.

Lesson 1: You would care about this in an iterative algorithm, but not in a
one-off application.
Lesson 2: Shuffle is slow in comparison, even for a small number of
elements.
Lesson 3: Spark would be even cooler with highly optimized reduce and
broadcast.



On Fri, Nov 8, 2013 at 7:01 PM, Philip Ogren philip.og...@oracle.comwrote:

 Thank you for the pointers.  I'm not sure I was able to fully understand
 either of your suggestions but here is what I came up with.  I started with
 Tom's code but I think I ended up borrowing from Patrick's suggestion too.
  Any thoughts about my updated solution are more than welcome!  I added
 local variable types for clarify.

   def countPopulatedColumns(tsv: RDD[String]) : RDD[(Int, Int)] = {
 //split by tab and zip with index to give column value, column index
 pairs
 val sparse : RDD[(String, Int)] = tsv.flatMap(line =
 line.split(\t).zipWithIndex)
 //filter out all the zero length values
 val dense : RDD[(String, Int)] = sparse.filter(valueIndex =
 valueIndex._1.length0)
 //map each column index to one and do the usual reduction
 dense.map(valueIndex = (valueIndex._2, 1)).reduceByKey(_+_)
   }

 Of course, this can be condensed to a single line but it doesn't seem as
 easy to read as the more verbose code above.  Write-once code like the
 following is why I never liked Perl

   def cpc(tsv: RDD[String]) : RDD[(Int, Int)] = {
 tsv.flatMap(_.split(\t).zipWithIndex).filter(ci =
 ci._1.length0).map(ci = (ci._2, 1)).reduceByKey(_+_)
   }

 Thanks,
 Philip



 On 11/8/2013 2:41 PM, Patrick Wendell wrote:

 Hey Tom,

 reduceByKey will reduce locally on all the nodes, so there won't be
 any data movement except to combine totals at the end.

 - Patrick

 On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek minnesota...@gmail.com wrote:

 Your example requires each row to be exactly the same length, since
 zipped
 will truncate to the shorter of its two arguments.

 The second solution is elegant, but reduceByKey involves flying a bunch
 of
 data around to sort the keys.  I suspect it would be a lot slower.  But
 you
 could save yourself from adding up a bunch of zeros:

   val sparseRows = spark.textFile(myfile.tsv).map(line =
 line.split(\t).zipWithIndex.filter(_._1.length0))
 sparseRows.reduce(mergeAdd(_,_))

 You'll have to write a mergeAdd function.  This might not be any faster,
 but
 it does allow variable length rows.


 On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell pwend...@gmail.com
 wrote:

 It would be a bit more straightforward to write it like this:

 val columns = [same as before]

 val counts = columns.flatMap(emit (col_id, 0 or 1) for each
 column).reduceByKey(_+ _)

 Basically look at each row and emit several records using flatMap.
 Each record has an ID for the column (maybe its index) and a flag for
 whether it's present.

 Then you reduce by key to get the per-column count. Then you can
 collect at the end.

 - Patrick

 On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren philip.og...@oracle.com
 wrote:

 Hi Spark coders,

 I wrote my first little Spark job that takes columnar data and counts
 up
 how
 many times each column is populated in an RDD.  Here is the code I came
 up
 with:

  //RDD of List[String] corresponding to tab delimited values
  val columns = spark.textFile(myfile.tsv).map(line =
 line.split(\t).toList)
  //RDD of List[Int] corresponding to populated columns (1 for
 populated
 and 0 for not populated)
  val populatedColumns = columns.map(row = row.map(column =
 if(column.length  0) 1 else 0))
  //List[Int] contains sums of the 1's in each column
  val counts = populatedColumns.reduce((row1,row2)
 =(row1,row2).zipped.map(_+_))

 Any thoughts about the fitness of this code snippet?