Agreed with Imran - without knowing the size/shape of the objects in your
program, it's tough to tell where the bottleneck is. Additionally, unless
the problem is really big, (in terms of your w and x vectors), it's
unlikely that you're going to be CPU bound on the cluster - communication
and stragglers become the dominant effect here.

I also agree that the join is causing a big bottleneck.

Even after eliminating the join and with really big w and x vectors,
though, you may still see spark run "slow" compared to your R program. This
may be because many of the operations you use in R are vectorized, and with
the spark code, you instead seem to be making your vectors a collection of
(offset,value) pairs.

There are several alternative strategies here:
1) Instead of a join, use a zip() if the two RDDs are the same size and are
paritioned identically. This will be much faster than a join.
2) An alternative strategy that could allow you to get vectorized ops in
spark while still operating in a distributed fashion is to pack your
partitions into something like a jblas DoubleMatrix, one per partition,
where each partition is a contiguous block of elements. Then you can apply
your gaussian transformation and elementwise multiply, etc. using jblas,
which links in blas - a low-level linear algebra library (not unlike what R
is using under the covers) for speed.

Again - with 2 you could still do it as a zip.

For examples of using jblas with spark code, take a look at the mllib
codebase in the spark repository on git.

- Evan


On Wed, Dec 11, 2013 at 4:51 PM, Imran Rashid <[email protected]> wrote:

> these are just thoughts off the top of my head:
>
> 1) if the original R code runs in 3 secs, you are unlikely to be able to
> improve that drastically with spark.  Yes, spark can run sub-second jobs,
> but no matter what, don't expect spark to get you into the < 10 millisecond
> range.  While spark has much lower overhead than hadoop at running jobs,
> there is definitely still significant overhead.  Its really meant to speed
> up bigger computations.  (Still, I understand the desire to start w/
> something small to get a feel for it.)
>
> 2) Spark is doing a *lot* more work the way you have it coded now.  I'm
> not entirely sure what is going on b/c I can't see what all the variables
> mean.  But at the very least, the spark code is dealing w/ different data,
> since I see Strings are involved in the spark code.
>
> In the R code, can you tell me the meaning of & size of:
> x
> x2
> w
> h
>
> and in the scala code, similarly for:
> x2
> id_x
> id_w
> h
>
> The join() in spark is the main thing that is really slow -- b/c its
> designed for big data sets, it involves serializing data to a file and
> reading it back.  That is almost certainly what is taking a long time.  And
> you are doing one join per element in x.  Again, without knowing the
> meaning of each of those variables, hard for me to say another way of doing
> it ... given that the R code doesn't have any strings or joins involved,
> you probably don't need them in spark.
>
> 3) If the problem really is small, maybe it doesn't make sense to use
> spark (related to #1).  Given the overhead associated w/ each task, you
> want them to be non-trivial amounts of work.  Eg., maybe a better example
> of using spark to speed this example up would be doing lots of kernel
> density estimates w/ the different bandwidths, and then choosing the best
> one.  pseudo-code:
>
> val data : Array[Double] = ...
> val bcData = sc.broadcast(data)
> val bandwidths = ... //eg (0 to 1000).map{_ / 1000.0}
> def computeEstimateAndGetError(data: Array[Double], bandwidth:Double):
> Double = {
>   // totally normal scala code goes here, not involving spark at all.
> compute the density estimate on part of the data, get the error on the rest
> of the data, and return the error
>   // maybe even do it on a bunch of different samples of data, eg. w/
> cross-validation
>   ...
>   // just return the error
> }
>
> //now use spark to search all bandwidths
> val (bestBandwidth, bestError) = sc.parallelize(bandwidths).map{ bandwidth
> =>
>   bandwidth -> computeEstimateAndGetError(bcData.value, bandwidth)
> }.collect.minBy{_._2}
>
> (still, maybe not a great example, since you could do something smarter
> than just a grid search, but hopefully that gives you an idea)
>
> 4) kind of a follow on to #3, but your first step might be to just write
> some normal scala code to really compute the exact same thing as R for
> comparison.
>
> 5) The original R code can be improved: You are copying ker a lot, because
> you keep appending only one element to it.  better to allocate it as the
> right size in the first place
> ker <- numeric(length(x2))
> (depending on the size of x2, this could be a major bottleneck, as it
> requires length(x2)^2 operations)
>
>
>
>
> On Wed, Dec 11, 2013 at 3:26 PM, ABEL ALEJANDRO CORONADO IRUEGAS <
> [email protected]> wrote:
>
>> We are trying to implement a Univariate Kernel Density Estimation in
>> Spark, our first step was implement it in R from Scratch, here the relevant
>> code:
>>
>> ## R CODE
>> ## gaussian kernel
>> gau<-function(z)
>> {
>>     k<- exp(-(z^2)/2)*(1/sqrt(2*pi))
>>     k
>> }
>>
>> ker<-c()
>> for ( j in 1:length(x2))
>> {  z<-(x2[j]-x)/h
>>    ker[j]<-sum(w*gau(z))/(sum(w)*h)
>> }
>>
>> Then we implement similar ideas in the Spark Shell
>>
>> // SCALA-SPARK CODE
>>
>> def gau(z:Double)={
>>     scala.math.exp(-scala.math.pow(z,2)/2)*(1/scala.math.sqrt(2*
>> scala.math.Pi))
>> }
>>
>> def kernel(x2j:Double,id_x:RDD[(java.lang.String, Double)] ,id_w:
>> RDD[(java.lang.String, Double)] ,h:Double) = {
>>     val z = id_x.mapValues(x=>(x2j-x)/h)
>>
>> z.mapValues(gau(_)).join(id_w).map(x=>(x._2._1*x._2._2)).sum/(id_w.map(x=>x._2).sum
>> * h)
>> }
>>
>> val ker = x2.map(kernel(_,id_x,id_w,h))
>>
>> The problem is that R is faster (3 sec) than Spark (30 min) in the same
>> machine (Quad Core), I´m sure is because we are not using Spark as well as
>> its possible.
>>
>> Can any help us
>>
>> Abel Coronado
>> @abxda
>>
>
>

Reply via email to