Thank you for the pointers. I'm not sure I was able to fully understand
either of your suggestions but here is what I came up with. I started
with Tom's code but I think I ended up borrowing from Patrick's
suggestion too. Any thoughts about my updated solution are more than
welcome! I added local variable types for clarify.
def countPopulatedColumns(tsv: RDD[String]) : RDD[(Int, Int)] = {
//split by tab and zip with index to give column value, column
index pairs
val sparse : RDD[(String, Int)] = tsv.flatMap(line =>
line.split("\t").zipWithIndex)
//filter out all the zero length values
val dense : RDD[(String, Int)] = sparse.filter(valueIndex =>
valueIndex._1.length>0)
//map each column index to one and do the usual reduction
dense.map(valueIndex => (valueIndex._2, 1)).reduceByKey(_+_)
}
Of course, this can be condensed to a single line but it doesn't seem as
easy to read as the more verbose code above. Write-once code like the
following is why I never liked Perl....
def cpc(tsv: RDD[String]) : RDD[(Int, Int)] = {
tsv.flatMap(_.split("\t").zipWithIndex).filter(ci =>
ci._1.length>0).map(ci => (ci._2, 1)).reduceByKey(_+_)
}
Thanks,
Philip
On 11/8/2013 2:41 PM, Patrick Wendell wrote:
Hey Tom,
reduceByKey will reduce locally on all the nodes, so there won't be
any data movement except to combine totals at the end.
- Patrick
On Fri, Nov 8, 2013 at 1:35 PM, Tom Vacek <[email protected]> wrote:
Your example requires each row to be exactly the same length, since zipped
will truncate to the shorter of its two arguments.
The second solution is elegant, but reduceByKey involves flying a bunch of
data around to sort the keys. I suspect it would be a lot slower. But you
could save yourself from adding up a bunch of zeros:
val sparseRows = spark.textFile("myfile.tsv").map(line =>
line.split("\t").zipWithIndex.filter(_._1.length>0))
sparseRows.reduce(mergeAdd(_,_))
You'll have to write a mergeAdd function. This might not be any faster, but
it does allow variable length rows.
On Fri, Nov 8, 2013 at 3:23 PM, Patrick Wendell <[email protected]> wrote:
It would be a bit more straightforward to write it like this:
val columns = [same as before]
val counts = columns.flatMap(emit (col_id, 0 or 1) for each
column).reduceByKey(_+ _)
Basically look at each row and emit several records using flatMap.
Each record has an ID for the column (maybe its index) and a flag for
whether it's present.
Then you reduce by key to get the per-column count. Then you can
collect at the end.
- Patrick
On Fri, Nov 8, 2013 at 1:15 PM, Philip Ogren <[email protected]>
wrote:
Hi Spark coders,
I wrote my first little Spark job that takes columnar data and counts up
how
many times each column is populated in an RDD. Here is the code I came
up
with:
//RDD of List[String] corresponding to tab delimited values
val columns = spark.textFile("myfile.tsv").map(line =>
line.split("\t").toList)
//RDD of List[Int] corresponding to populated columns (1 for
populated
and 0 for not populated)
val populatedColumns = columns.map(row => row.map(column =>
if(column.length > 0) 1 else 0))
//List[Int] contains sums of the 1's in each column
val counts = populatedColumns.reduce((row1,row2)
=>(row1,row2).zipped.map(_+_))
Any thoughts about the fitness of this code snippet? I'm a little
annoyed
by creating an RDD full of 1's and 0's in the second line. The if
statement
feels awkward too. I was happy to find the zipped method for the reduce
step. Any feedback you might have on how to improve this code is
appreciated. I'm a newbie to both Scala and Spark.
Thanks,
Philip