these are just thoughts off the top of my head:

1) if the original R code runs in 3 secs, you are unlikely to be able to
improve that drastically with spark.  Yes, spark can run sub-second jobs,
but no matter what, don't expect spark to get you into the < 10 millisecond
range.  While spark has much lower overhead than hadoop at running jobs,
there is definitely still significant overhead.  Its really meant to speed
up bigger computations.  (Still, I understand the desire to start w/
something small to get a feel for it.)

2) Spark is doing a *lot* more work the way you have it coded now.  I'm not
entirely sure what is going on b/c I can't see what all the variables
mean.  But at the very least, the spark code is dealing w/ different data,
since I see Strings are involved in the spark code.

In the R code, can you tell me the meaning of & size of:
x
x2
w
h

and in the scala code, similarly for:
x2
id_x
id_w
h

The join() in spark is the main thing that is really slow -- b/c its
designed for big data sets, it involves serializing data to a file and
reading it back.  That is almost certainly what is taking a long time.  And
you are doing one join per element in x.  Again, without knowing the
meaning of each of those variables, hard for me to say another way of doing
it ... given that the R code doesn't have any strings or joins involved,
you probably don't need them in spark.

3) If the problem really is small, maybe it doesn't make sense to use spark
(related to #1).  Given the overhead associated w/ each task, you want them
to be non-trivial amounts of work.  Eg., maybe a better example of using
spark to speed this example up would be doing lots of kernel density
estimates w/ the different bandwidths, and then choosing the best one.
pseudo-code:

val data : Array[Double] = ...
val bcData = sc.broadcast(data)
val bandwidths = ... //eg (0 to 1000).map{_ / 1000.0}
def computeEstimateAndGetError(data: Array[Double], bandwidth:Double):
Double = {
  // totally normal scala code goes here, not involving spark at all.
compute the density estimate on part of the data, get the error on the rest
of the data, and return the error
  // maybe even do it on a bunch of different samples of data, eg. w/
cross-validation
  ...
  // just return the error
}

//now use spark to search all bandwidths
val (bestBandwidth, bestError) = sc.parallelize(bandwidths).map{ bandwidth
=>
  bandwidth -> computeEstimateAndGetError(bcData.value, bandwidth)
}.collect.minBy{_._2}

(still, maybe not a great example, since you could do something smarter
than just a grid search, but hopefully that gives you an idea)

4) kind of a follow on to #3, but your first step might be to just write
some normal scala code to really compute the exact same thing as R for
comparison.

5) The original R code can be improved: You are copying ker a lot, because
you keep appending only one element to it.  better to allocate it as the
right size in the first place
ker <- numeric(length(x2))
(depending on the size of x2, this could be a major bottleneck, as it
requires length(x2)^2 operations)




On Wed, Dec 11, 2013 at 3:26 PM, ABEL ALEJANDRO CORONADO IRUEGAS <
[email protected]> wrote:

> We are trying to implement a Univariate Kernel Density Estimation in
> Spark, our first step was implement it in R from Scratch, here the relevant
> code:
>
> ## R CODE
> ## gaussian kernel
> gau<-function(z)
> {
>     k<- exp(-(z^2)/2)*(1/sqrt(2*pi))
>     k
> }
>
> ker<-c()
> for ( j in 1:length(x2))
> {  z<-(x2[j]-x)/h
>    ker[j]<-sum(w*gau(z))/(sum(w)*h)
> }
>
> Then we implement similar ideas in the Spark Shell
>
> // SCALA-SPARK CODE
>
> def gau(z:Double)={
>     scala.math.exp(-scala.math.pow(z,2)/2)*(1/scala.math.sqrt(2*
> scala.math.Pi))
> }
>
> def kernel(x2j:Double,id_x:RDD[(java.lang.String, Double)] ,id_w:
> RDD[(java.lang.String, Double)] ,h:Double) = {
>     val z = id_x.mapValues(x=>(x2j-x)/h)
>
> z.mapValues(gau(_)).join(id_w).map(x=>(x._2._1*x._2._2)).sum/(id_w.map(x=>x._2).sum
> * h)
> }
>
> val ker = x2.map(kernel(_,id_x,id_w,h))
>
> The problem is that R is faster (3 sec) than Spark (30 min) in the same
> machine (Quad Core), I´m sure is because we are not using Spark as well as
> its possible.
>
> Can any help us
>
> Abel Coronado
> @abxda
>

Reply via email to