after googling around I realize how ridiculous my question is :(

being new to Spark, for some reason I thought all of the basic "stats"
function were implemented in a first class way out of the box over the
mapreduce framework... oops! sorry for the spam :)



On Monday, February 3, 2014, Justin Lent <[email protected]> wrote:

> So i've been struggling with this now for a bit using SparkR.. I can't
> even seem to write a basic mean/median function in R that works when
> passing it into reduceByKey() for my very simple dataset. I can pass
> in R's base function 'sum' and it works just fine. Looking in help
> shows that the signature for sum is sum(...)  while mean and median
> are mean(x), so that difference is why it's not working -- i just
> can't for the life of me write wrappers for mean/median to work with
> reduceByKey(). I've pasted in below a take(5) of the RDD i'm trying to
> use reduceByKey with to show the structure (its just a list of list).
> As well, i also have pasted in the result of doing a groupByKey()
> because presumably this is the same data structure the reduceByKey()
> gets before it does its reducing right?
>
> FWIW, i've written this function that works fine if run on the results
> of a groupByKey() operation so i must be close to what it should be to
> work in a reduceByKey():
>
> avg_g <- function(resultFromGroupByKey){
> return( lapply( lapply( lapply( resultFromGroupByKey, "[[", 2 ) ,
> unlist ) , mean ) )
> }
>
> my best guess as to how to convert this to working with reduceByKey is
> this below, since it works when i call a single Value from the (K,V)
> pair returned from groupByKey(). Unforutnately using it in reduceByKey
> results in a java NPE.
>
> avg <- function(x){ return( mean(unlist(x,recursive=FALSE)) ) }
>
>
> Any help would be appreciated.... and here comes the cut/paste of the
> data, and the NPE trace
>
>
> THE DATA  (take 5, just assume the keys in the whole dataset go from A to
> Z)
>
> [[1]]
> [[1]][[1]]
> [1] "A"
>
> [[1]][[2]]
> [1] 136
>
>
> [[2]]
> [[2]][[1]]
> [1] "A"
>
> [[2]][[2]]
> [1] 136
>
>
> [[3]]
> [[3]][[1]]
> [1] "A"
>
> [[3]][[2]]
> [1] 136
>
>
> [[4]]
> [[4]][[1]]
> [1] "A"
>
> [[4]][[2]]
> [1] 136
>
>
> [[5]]
> [[5]][[1]]
> [1] "A"
>
> [[5]][[2]]
> [1] 136
>
>
>
> THE DATA AFTER GROUPBYKEY()
>
> [[1]]
> [[1]][[1]]
> [1] "B"
>
> [[1]][[2]]
> [[1]][[2]][[1]]
> [1] 136
>
> [[1]][[2]][[2]]
> [1] 136
>
> [[1]][[2]][[3]]
> [1] 136
>
> [[1]][[2]][[4]]
> [1] 136
>
> [[1]][[2]][[5]]
> [1] 136
>
> [[1]][[2]][[6]]
> [1] 136
>
> [[1]][[2]][[7]]
> [1] 136
>
> [[1]][[2]][[8]]
> [1] 136
>
>
>
>
>
> > take( reduceByKey( sparkData_map2, avg, 2L ) , 5 )
> Error in (function (x)  : unused argument (136)
> Calls: do.call ... FUN -> lapply -> lapply -> FUN -> do.call -> <Anonymous>
> Execution halted
> 14/02/03 16:37:47 ERROR Executor: Exception in task ID 1407
> java.lang.NullPointerException
> at edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:116)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> at org.apache.spark.scheduler.Task.run(Task.scala:53)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> at java.lang.Thread.run(Thread.java:695)
> 14/02/03 16:37:47 WARN TaskSetManager: Lost TID 1407 (task 1415.0:0)
> 14/02/03 16:37:47 WARN TaskSetManager: Loss was due to
> java.lang.NullPointerException
> java.lang.NullPointerException
> at edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:116)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> at org.apache.spark.scheduler.Task.run(Task.scala:53)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> at java.lang.Thread.run(Thread.java:695)
> 14/02/03 16:37:47 ERROR TaskSetManager: Task 1415.0:0 failed 1 times;
> aborting job
> Error in .jcall(rdd@jrdd, "[Ljava/util/List;", "collectPartitions",
> .jarray(as.integer(index))) :
>   org.apache.spark.SparkException: Job aborted: Task 1415.0:0 failed 1
> times (most recent failure: Exception failure:
> java.lang.NullPointerException)
>


-- 
** Sent from my iPhone *

Reply via email to