Agreed with Imran - without knowing the size/shape of the objects in your program, it's tough to tell where the bottleneck is. Additionally, unless the problem is really big, (in terms of your w and x vectors), it's unlikely that you're going to be CPU bound on the cluster - communication and stragglers become the dominant effect here.
I also agree that the join is causing a big bottleneck. Even after eliminating the join and with really big w and x vectors, though, you may still see spark run "slow" compared to your R program. This may be because many of the operations you use in R are vectorized, and with the spark code, you instead seem to be making your vectors a collection of (offset,value) pairs. There are several alternative strategies here: 1) Instead of a join, use a zip() if the two RDDs are the same size and are paritioned identically. This will be much faster than a join. 2) An alternative strategy that could allow you to get vectorized ops in spark while still operating in a distributed fashion is to pack your partitions into something like a jblas DoubleMatrix, one per partition, where each partition is a contiguous block of elements. Then you can apply your gaussian transformation and elementwise multiply, etc. using jblas, which links in blas - a low-level linear algebra library (not unlike what R is using under the covers) for speed. Again - with 2 you could still do it as a zip. For examples of using jblas with spark code, take a look at the mllib codebase in the spark repository on git. - Evan On Wed, Dec 11, 2013 at 4:51 PM, Imran Rashid <[email protected]> wrote: > these are just thoughts off the top of my head: > > 1) if the original R code runs in 3 secs, you are unlikely to be able to > improve that drastically with spark. Yes, spark can run sub-second jobs, > but no matter what, don't expect spark to get you into the < 10 millisecond > range. While spark has much lower overhead than hadoop at running jobs, > there is definitely still significant overhead. Its really meant to speed > up bigger computations. (Still, I understand the desire to start w/ > something small to get a feel for it.) > > 2) Spark is doing a *lot* more work the way you have it coded now. I'm > not entirely sure what is going on b/c I can't see what all the variables > mean. But at the very least, the spark code is dealing w/ different data, > since I see Strings are involved in the spark code. > > In the R code, can you tell me the meaning of & size of: > x > x2 > w > h > > and in the scala code, similarly for: > x2 > id_x > id_w > h > > The join() in spark is the main thing that is really slow -- b/c its > designed for big data sets, it involves serializing data to a file and > reading it back. That is almost certainly what is taking a long time. And > you are doing one join per element in x. Again, without knowing the > meaning of each of those variables, hard for me to say another way of doing > it ... given that the R code doesn't have any strings or joins involved, > you probably don't need them in spark. > > 3) If the problem really is small, maybe it doesn't make sense to use > spark (related to #1). Given the overhead associated w/ each task, you > want them to be non-trivial amounts of work. Eg., maybe a better example > of using spark to speed this example up would be doing lots of kernel > density estimates w/ the different bandwidths, and then choosing the best > one. pseudo-code: > > val data : Array[Double] = ... > val bcData = sc.broadcast(data) > val bandwidths = ... //eg (0 to 1000).map{_ / 1000.0} > def computeEstimateAndGetError(data: Array[Double], bandwidth:Double): > Double = { > // totally normal scala code goes here, not involving spark at all. > compute the density estimate on part of the data, get the error on the rest > of the data, and return the error > // maybe even do it on a bunch of different samples of data, eg. w/ > cross-validation > ... > // just return the error > } > > //now use spark to search all bandwidths > val (bestBandwidth, bestError) = sc.parallelize(bandwidths).map{ bandwidth > => > bandwidth -> computeEstimateAndGetError(bcData.value, bandwidth) > }.collect.minBy{_._2} > > (still, maybe not a great example, since you could do something smarter > than just a grid search, but hopefully that gives you an idea) > > 4) kind of a follow on to #3, but your first step might be to just write > some normal scala code to really compute the exact same thing as R for > comparison. > > 5) The original R code can be improved: You are copying ker a lot, because > you keep appending only one element to it. better to allocate it as the > right size in the first place > ker <- numeric(length(x2)) > (depending on the size of x2, this could be a major bottleneck, as it > requires length(x2)^2 operations) > > > > > On Wed, Dec 11, 2013 at 3:26 PM, ABEL ALEJANDRO CORONADO IRUEGAS < > [email protected]> wrote: > >> We are trying to implement a Univariate Kernel Density Estimation in >> Spark, our first step was implement it in R from Scratch, here the relevant >> code: >> >> ## R CODE >> ## gaussian kernel >> gau<-function(z) >> { >> k<- exp(-(z^2)/2)*(1/sqrt(2*pi)) >> k >> } >> >> ker<-c() >> for ( j in 1:length(x2)) >> { z<-(x2[j]-x)/h >> ker[j]<-sum(w*gau(z))/(sum(w)*h) >> } >> >> Then we implement similar ideas in the Spark Shell >> >> // SCALA-SPARK CODE >> >> def gau(z:Double)={ >> scala.math.exp(-scala.math.pow(z,2)/2)*(1/scala.math.sqrt(2* >> scala.math.Pi)) >> } >> >> def kernel(x2j:Double,id_x:RDD[(java.lang.String, Double)] ,id_w: >> RDD[(java.lang.String, Double)] ,h:Double) = { >> val z = id_x.mapValues(x=>(x2j-x)/h) >> >> z.mapValues(gau(_)).join(id_w).map(x=>(x._2._1*x._2._2)).sum/(id_w.map(x=>x._2).sum >> * h) >> } >> >> val ker = x2.map(kernel(_,id_x,id_w,h)) >> >> The problem is that R is faster (3 sec) than Spark (30 min) in the same >> machine (Quad Core), I´m sure is because we are not using Spark as well as >> its possible. >> >> Can any help us >> >> Abel Coronado >> @abxda >> > >
