after googling around I realize how ridiculous my question is :( being new to Spark, for some reason I thought all of the basic "stats" function were implemented in a first class way out of the box over the mapreduce framework... oops! sorry for the spam :)
On Monday, February 3, 2014, Justin Lent <[email protected]> wrote: > So i've been struggling with this now for a bit using SparkR.. I can't > even seem to write a basic mean/median function in R that works when > passing it into reduceByKey() for my very simple dataset. I can pass > in R's base function 'sum' and it works just fine. Looking in help > shows that the signature for sum is sum(...) while mean and median > are mean(x), so that difference is why it's not working -- i just > can't for the life of me write wrappers for mean/median to work with > reduceByKey(). I've pasted in below a take(5) of the RDD i'm trying to > use reduceByKey with to show the structure (its just a list of list). > As well, i also have pasted in the result of doing a groupByKey() > because presumably this is the same data structure the reduceByKey() > gets before it does its reducing right? > > FWIW, i've written this function that works fine if run on the results > of a groupByKey() operation so i must be close to what it should be to > work in a reduceByKey(): > > avg_g <- function(resultFromGroupByKey){ > return( lapply( lapply( lapply( resultFromGroupByKey, "[[", 2 ) , > unlist ) , mean ) ) > } > > my best guess as to how to convert this to working with reduceByKey is > this below, since it works when i call a single Value from the (K,V) > pair returned from groupByKey(). Unforutnately using it in reduceByKey > results in a java NPE. > > avg <- function(x){ return( mean(unlist(x,recursive=FALSE)) ) } > > > Any help would be appreciated.... and here comes the cut/paste of the > data, and the NPE trace > > > THE DATA (take 5, just assume the keys in the whole dataset go from A to > Z) > > [[1]] > [[1]][[1]] > [1] "A" > > [[1]][[2]] > [1] 136 > > > [[2]] > [[2]][[1]] > [1] "A" > > [[2]][[2]] > [1] 136 > > > [[3]] > [[3]][[1]] > [1] "A" > > [[3]][[2]] > [1] 136 > > > [[4]] > [[4]][[1]] > [1] "A" > > [[4]][[2]] > [1] 136 > > > [[5]] > [[5]][[1]] > [1] "A" > > [[5]][[2]] > [1] 136 > > > > THE DATA AFTER GROUPBYKEY() > > [[1]] > [[1]][[1]] > [1] "B" > > [[1]][[2]] > [[1]][[2]][[1]] > [1] 136 > > [[1]][[2]][[2]] > [1] 136 > > [[1]][[2]][[3]] > [1] 136 > > [[1]][[2]][[4]] > [1] 136 > > [[1]][[2]][[5]] > [1] 136 > > [[1]][[2]][[6]] > [1] 136 > > [[1]][[2]][[7]] > [1] 136 > > [[1]][[2]][[8]] > [1] 136 > > > > > > > take( reduceByKey( sparkData_map2, avg, 2L ) , 5 ) > Error in (function (x) : unused argument (136) > Calls: do.call ... FUN -> lapply -> lapply -> FUN -> do.call -> <Anonymous> > Execution halted > 14/02/03 16:37:47 ERROR Executor: Exception in task ID 1407 > java.lang.NullPointerException > at edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:116) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) > at org.apache.spark.scheduler.Task.run(Task.scala:53) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) > at > org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) > at java.lang.Thread.run(Thread.java:695) > 14/02/03 16:37:47 WARN TaskSetManager: Lost TID 1407 (task 1415.0:0) > 14/02/03 16:37:47 WARN TaskSetManager: Loss was due to > java.lang.NullPointerException > java.lang.NullPointerException > at edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:116) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) > at org.apache.spark.scheduler.Task.run(Task.scala:53) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) > at > org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) > at java.lang.Thread.run(Thread.java:695) > 14/02/03 16:37:47 ERROR TaskSetManager: Task 1415.0:0 failed 1 times; > aborting job > Error in .jcall(rdd@jrdd, "[Ljava/util/List;", "collectPartitions", > .jarray(as.integer(index))) : > org.apache.spark.SparkException: Job aborted: Task 1415.0:0 failed 1 > times (most recent failure: Exception failure: > java.lang.NullPointerException) > -- ** Sent from my iPhone *
