Thank you, i have one year homework ;)
On Thu, Dec 12, 2013 at 2:51 PM, Imran Rashid <[email protected]> wrote: > ah, got it, makes a lot more sense now. I couldn't figure out what w was, > I should have figured it was weights. > > As Evan suggested, using zip is almost certainly what you want. > > val pointsAndWeights: RDD[(Double,Double)] = ... > > zipping together id_x and id_w will give you exactly that, but maybe you > can just create it directly in the first place, it depends on how you are > creating id_x & id_w > > In some ways, a univariate kernel density estimate isn't the perfect task > for spark, since every data point effects all of the 512 output points. > But, since 512 is so small, there is a pretty good workaround. My next > suggestion would be to try to make this a "one-pass" data point -- as you > look at each point, compute its effect on *all* 512 output positions. eg.: > > > val outputPoints = x2.toArray > val pointToOutputEffect: RDD[Array[Double]] = > pointsAndWeights.map{case(point, weight) => > val ker = new Array[Double](512) > (0 until 512).foreach{idx => > val z = (outputPoints(idx)-point)/h > ker(idx) = weight * z > } > } > > now we've got an RDD with 283,469 elements in it, each one which is an > array of size 512. Now we just want to do an element-wise sum of each of > those arrays. But, we want to make sure we do that sum distributed -- we > don't just want to return all of those arrays to the driver and do the sum > there. There are a few different ways to do it: > > 1) use fold to combine the results within each partition (distributed), > and then just add one array per partition on the driver (non-distributed). > > val kernel = pointToOutputEffect.fold(new Array[Double](512)){case(left, > right) => left += right} > > (the += method doesn't exist on arrays, you'd have to write it) > > 2) use aggregate(), and just add the results from each data point directly > into your sum array. this should be much more efficient -- you don't have > to allocate an array once per data point, which is a huge waste in the > first approach. > > val kernel = pointsAndWeights.aggregate(new Array[Double](512))( > {case (ker,(point, weight)) => > //given one point, add it into the result we have so far > (0 until 512).foreach{idx => > val z = (outputPoints(idx) - point) / h > ker(idx) += weight * z > } > }, > {case (left, right) => > //given two results from part of the data, combine them both > left += right > } > ) > > > 3) use accumulators -- sort of like a variant of aggregate(), but has the > advantage of letting you accumulate multiple things in one pass. This one > is a bit more work -- you need to define some helper classes which say how > to merge together two Array[Double], it will be a bit hard to follow so > I'll leave it out for now :) > > > I'd probably go for #2. I bet it will get you down to a few seconds :) > > > In all of the approaches I've outlined, there is an important tradeoff you > have to make of choosing how big each partition is. If you make lots of > small partitions, then you can make use of more distributed executors, and > you'll also get more parallelism if some tasks are bigger, some executors > are slower, etc. However, the driver (your "main" program that you use for > launching spark) will have more work to do to collect all the results > together and add them all up. The fewer tasks you use, the less work the > driver has to do to merge it all together. > > The code examples assume sum(w) = 1 -- there is some more bookkeeping you > need to do if it isn't, hopefully you can add that in. (Potentially a good > use of an accumulator.) > > Also, I've just been adding doubles together and assuming the numeric > accuracy woudl be good enough, but you may need to consider something like: > https://en.wikipedia.org/wiki/Kahan_summation_algorithm > > Finally, if you are tired of writing lots of + methods for arrays & for > kahan summations etc., you might consider using something like algebird: > https://github.com/twitter/algebird > but, if you're new to scala, that will probably look like gibberish, don't > worry about it. it took me a year with scala before I started to > appreciate it :) > > > > On Thu, Dec 12, 2013 at 1:16 PM, ABEL ALEJANDRO CORONADO IRUEGAS < > [email protected]> wrote: > >> First of all, thank you very much, it is a vibrant community. >> >> Sorry for being so frugal in my question. >> >> >> The dataset is going to be Bigger, we are processing the Mexican Economic >> Census Data, and some of the data sets are around millions of records, may >> be it is not enough to call it Big Data but it is big enough to R and other >> traditional statistical software. >> >> >> I´m in the National Statistical Office of México and we are doing a >> proof of concept with Spark to see if it is a good solution for this kind >> of problems in my institution, more close to statistical analysis as in R >> than data processing as in Map-Reduce family. I think Spark is going to >> mature in the near future in the mlib direction, with more support to >> vectorized operations “like” R, thinking in spark as the R for BigData. >> >> >> Ok, thank you Imran let me do some comments point by point >> >> 1) 1) Yes you are right, we are getting a feel of spark with a >> small data set. Once the spark implementation its stable enough we are >> going to increment the size of the data set and the size of the cluster. >> But 30 mins is a bad smell about my implementation of the algorithm. >> >> 2) 2) Yes I´m trying to understand the Spark way , in R everything >> is a vector and of course anything is clusterized, I see R as a mean to >> communicate algorithms and ideas with the statistical research area, they >> are strong in R and in statistics. >> >> Next, the meaning of the R variables are: >> >> x # vector with 283,469 observations (Double) from which the density >> estimation will be made >> >> x2 # vector of 512 points where the density will be estimated (Double) >> >> w # vector with 283,469 weights one per each observation (Double) >> >> h # the bandwidth 1 number (Double) >> >> >> >> in Scala >> >> id_x // org.apache.spark.rdd.RDD[(java.lang.String, Double)] with >> 283,469 observations. Where the ugly String is the unconverted consecutive >> Integer that helps me to join the vectors id_x and id_w. >> >> x2 // 512 values in scala.collection.immutable.NumericRange[Double], >> where the density will be estimated >> >> id_w // the 283,469 weights with the ugly String Id, corresponding One >> to One with the id in id_x. >> >> h // One Double for the bandwidth >> >> >> >> As you can see there are some places in the scala algorithm were is >> nedded to do element by element operations like in the : >> >> val z = id_x.mapValues(x=>(x2j-x)/h) >> >> >> >> where z is a new vector with the arithmetic transformation element by >> element of the id_x vector that still have the string id. Maybe as Evan >> says in this place we can manually split the vector in JBlas subvectors, >> and run vectorized operations to avoid the implicit iteration element by >> element, but in this moment I don’t realize how to do that. >> >> The other place where the element by element operations are doing is here: >> >> z.mapValues(gau(_)).join(id_w).map(x=>(x._2._1*x._2._2)).sum >> >> >> >> Here happens the most interesting thing in terms of parallelism and where >> the optimizations could be done, but I don’t know how exactly, the idea >> there is that its needed an element by element operation between two big >> vectors in R its simple : >> >> sum(w*gau(z)) >> >> >> >> If I understand well, it’s just what Evan recommends via the ZippedRDD. >> And let me use the vectors directly with out the ugly string id. >> >> >> >> 3) 3) Thanks for the idea >> >> 4) 4) May be we are close to finish the Scala-Spark implementation >> and its not necessary to do a pure Scala implementation. >> >> 5) 5) Thank you, we are to improve de R code. >> >> >> >> And finally Evan, could you give some hints about how to pack the vectors >> in jblas DoubleMatrix, I mean how it is possible control the partitioning. >> I appreciate a lot any direction around that. >> >> Thanks Again to the Spark community !!! >> >> >> On Wed, Dec 11, 2013 at 5:24 PM, Evan R. Sparks <[email protected]>wrote: >> >>> Agreed with Imran - without knowing the size/shape of the objects in >>> your program, it's tough to tell where the bottleneck is. Additionally, >>> unless the problem is really big, (in terms of your w and x vectors), it's >>> unlikely that you're going to be CPU bound on the cluster - communication >>> and stragglers become the dominant effect here. >>> >>> I also agree that the join is causing a big bottleneck. >>> >>> Even after eliminating the join and with really big w and x vectors, >>> though, you may still see spark run "slow" compared to your R program. This >>> may be because many of the operations you use in R are vectorized, and with >>> the spark code, you instead seem to be making your vectors a collection of >>> (offset,value) pairs. >>> >>> There are several alternative strategies here: >>> 1) Instead of a join, use a zip() if the two RDDs are the same size and >>> are paritioned identically. This will be much faster than a join. >>> 2) An alternative strategy that could allow you to get vectorized ops >>> in spark while still operating in a distributed fashion is to pack your >>> partitions into something like a jblas DoubleMatrix, one per partition, >>> where each partition is a contiguous block of elements. Then you can apply >>> your gaussian transformation and elementwise multiply, etc. using jblas, >>> which links in blas - a low-level linear algebra library (not unlike what R >>> is using under the covers) for speed. >>> >>> Again - with 2 you could still do it as a zip. >>> >>> For examples of using jblas with spark code, take a look at the mllib >>> codebase in the spark repository on git. >>> >>> - Evan >>> >>> >>> On Wed, Dec 11, 2013 at 4:51 PM, Imran Rashid <[email protected]>wrote: >>> >>>> these are just thoughts off the top of my head: >>>> >>>> 1) if the original R code runs in 3 secs, you are unlikely to be able >>>> to improve that drastically with spark. Yes, spark can run sub-second >>>> jobs, but no matter what, don't expect spark to get you into the < 10 >>>> millisecond range. While spark has much lower overhead than hadoop at >>>> running jobs, there is definitely still significant overhead. Its really >>>> meant to speed up bigger computations. (Still, I understand the desire to >>>> start w/ something small to get a feel for it.) >>>> >>>> 2) Spark is doing a *lot* more work the way you have it coded now. I'm >>>> not entirely sure what is going on b/c I can't see what all the variables >>>> mean. But at the very least, the spark code is dealing w/ different data, >>>> since I see Strings are involved in the spark code. >>>> >>>> In the R code, can you tell me the meaning of & size of: >>>> x >>>> x2 >>>> w >>>> h >>>> >>>> and in the scala code, similarly for: >>>> x2 >>>> id_x >>>> id_w >>>> h >>>> >>>> The join() in spark is the main thing that is really slow -- b/c its >>>> designed for big data sets, it involves serializing data to a file and >>>> reading it back. That is almost certainly what is taking a long time. And >>>> you are doing one join per element in x. Again, without knowing the >>>> meaning of each of those variables, hard for me to say another way of doing >>>> it ... given that the R code doesn't have any strings or joins involved, >>>> you probably don't need them in spark. >>>> >>>> 3) If the problem really is small, maybe it doesn't make sense to use >>>> spark (related to #1). Given the overhead associated w/ each task, you >>>> want them to be non-trivial amounts of work. Eg., maybe a better example >>>> of using spark to speed this example up would be doing lots of kernel >>>> density estimates w/ the different bandwidths, and then choosing the best >>>> one. pseudo-code: >>>> >>>> val data : Array[Double] = ... >>>> val bcData = sc.broadcast(data) >>>> val bandwidths = ... //eg (0 to 1000).map{_ / 1000.0} >>>> def computeEstimateAndGetError(data: Array[Double], bandwidth:Double): >>>> Double = { >>>> // totally normal scala code goes here, not involving spark at all. >>>> compute the density estimate on part of the data, get the error on the rest >>>> of the data, and return the error >>>> // maybe even do it on a bunch of different samples of data, eg. w/ >>>> cross-validation >>>> ... >>>> // just return the error >>>> } >>>> >>>> //now use spark to search all bandwidths >>>> val (bestBandwidth, bestError) = sc.parallelize(bandwidths).map{ >>>> bandwidth => >>>> bandwidth -> computeEstimateAndGetError(bcData.value, bandwidth) >>>> }.collect.minBy{_._2} >>>> >>>> (still, maybe not a great example, since you could do something smarter >>>> than just a grid search, but hopefully that gives you an idea) >>>> >>>> 4) kind of a follow on to #3, but your first step might be to just >>>> write some normal scala code to really compute the exact same thing as R >>>> for comparison. >>>> >>>> 5) The original R code can be improved: You are copying ker a lot, >>>> because you keep appending only one element to it. better to allocate it >>>> as the right size in the first place >>>> ker <- numeric(length(x2)) >>>> (depending on the size of x2, this could be a major bottleneck, as it >>>> requires length(x2)^2 operations) >>>> >>>> >>>> >>>> >>>> On Wed, Dec 11, 2013 at 3:26 PM, ABEL ALEJANDRO CORONADO IRUEGAS < >>>> [email protected]> wrote: >>>> >>>>> We are trying to implement a Univariate Kernel Density Estimation in >>>>> Spark, our first step was implement it in R from Scratch, here the >>>>> relevant >>>>> code: >>>>> >>>>> ## R CODE >>>>> ## gaussian kernel >>>>> gau<-function(z) >>>>> { >>>>> k<- exp(-(z^2)/2)*(1/sqrt(2*pi)) >>>>> k >>>>> } >>>>> >>>>> ker<-c() >>>>> for ( j in 1:length(x2)) >>>>> { z<-(x2[j]-x)/h >>>>> ker[j]<-sum(w*gau(z))/(sum(w)*h) >>>>> } >>>>> >>>>> Then we implement similar ideas in the Spark Shell >>>>> >>>>> // SCALA-SPARK CODE >>>>> >>>>> def gau(z:Double)={ >>>>> scala.math.exp(-scala.math.pow(z,2)/2)*(1/scala.math.sqrt(2* >>>>> scala.math.Pi)) >>>>> } >>>>> >>>>> def kernel(x2j:Double,id_x:RDD[(java.lang.String, Double)] ,id_w: >>>>> RDD[(java.lang.String, Double)] ,h:Double) = { >>>>> val z = id_x.mapValues(x=>(x2j-x)/h) >>>>> >>>>> z.mapValues(gau(_)).join(id_w).map(x=>(x._2._1*x._2._2)).sum/(id_w.map(x=>x._2).sum >>>>> * h) >>>>> } >>>>> >>>>> val ker = x2.map(kernel(_,id_x,id_w,h)) >>>>> >>>>> The problem is that R is faster (3 sec) than Spark (30 min) in the >>>>> same machine (Quad Core), I´m sure is because we are not using Spark as >>>>> well as its possible. >>>>> >>>>> Can any help us >>>>> >>>>> Abel Coronado >>>>> @abxda >>>>> >>>> >>>> >>> >> >
