ah, got it, makes a lot more sense now. I couldn't figure out what w was,
I should have figured it was weights.
As Evan suggested, using zip is almost certainly what you want.
val pointsAndWeights: RDD[(Double,Double)] = ...
zipping together id_x and id_w will give you exactly that, but maybe you
can just create it directly in the first place, it depends on how you are
creating id_x & id_w
In some ways, a univariate kernel density estimate isn't the perfect task
for spark, since every data point effects all of the 512 output points.
But, since 512 is so small, there is a pretty good workaround. My next
suggestion would be to try to make this a "one-pass" data point -- as you
look at each point, compute its effect on *all* 512 output positions. eg.:
val outputPoints = x2.toArray
val pointToOutputEffect: RDD[Array[Double]] =
pointsAndWeights.map{case(point, weight) =>
val ker = new Array[Double](512)
(0 until 512).foreach{idx =>
val z = (outputPoints(idx)-point)/h
ker(idx) = weight * z
}
}
now we've got an RDD with 283,469 elements in it, each one which is an
array of size 512. Now we just want to do an element-wise sum of each of
those arrays. But, we want to make sure we do that sum distributed -- we
don't just want to return all of those arrays to the driver and do the sum
there. There are a few different ways to do it:
1) use fold to combine the results within each partition (distributed), and
then just add one array per partition on the driver (non-distributed).
val kernel = pointToOutputEffect.fold(new Array[Double](512)){case(left,
right) => left += right}
(the += method doesn't exist on arrays, you'd have to write it)
2) use aggregate(), and just add the results from each data point directly
into your sum array. this should be much more efficient -- you don't have
to allocate an array once per data point, which is a huge waste in the
first approach.
val kernel = pointsAndWeights.aggregate(new Array[Double](512))(
{case (ker,(point, weight)) =>
//given one point, add it into the result we have so far
(0 until 512).foreach{idx =>
val z = (outputPoints(idx) - point) / h
ker(idx) += weight * z
}
},
{case (left, right) =>
//given two results from part of the data, combine them both
left += right
}
)
3) use accumulators -- sort of like a variant of aggregate(), but has the
advantage of letting you accumulate multiple things in one pass. This one
is a bit more work -- you need to define some helper classes which say how
to merge together two Array[Double], it will be a bit hard to follow so
I'll leave it out for now :)
I'd probably go for #2. I bet it will get you down to a few seconds :)
In all of the approaches I've outlined, there is an important tradeoff you
have to make of choosing how big each partition is. If you make lots of
small partitions, then you can make use of more distributed executors, and
you'll also get more parallelism if some tasks are bigger, some executors
are slower, etc. However, the driver (your "main" program that you use for
launching spark) will have more work to do to collect all the results
together and add them all up. The fewer tasks you use, the less work the
driver has to do to merge it all together.
The code examples assume sum(w) = 1 -- there is some more bookkeeping you
need to do if it isn't, hopefully you can add that in. (Potentially a good
use of an accumulator.)
Also, I've just been adding doubles together and assuming the numeric
accuracy woudl be good enough, but you may need to consider something like:
https://en.wikipedia.org/wiki/Kahan_summation_algorithm
Finally, if you are tired of writing lots of + methods for arrays & for
kahan summations etc., you might consider using something like algebird:
https://github.com/twitter/algebird
but, if you're new to scala, that will probably look like gibberish, don't
worry about it. it took me a year with scala before I started to
appreciate it :)
On Thu, Dec 12, 2013 at 1:16 PM, ABEL ALEJANDRO CORONADO IRUEGAS <
[email protected]> wrote:
> First of all, thank you very much, it is a vibrant community.
>
> Sorry for being so frugal in my question.
>
>
> The dataset is going to be Bigger, we are processing the Mexican Economic
> Census Data, and some of the data sets are around millions of records, may
> be it is not enough to call it Big Data but it is big enough to R and other
> traditional statistical software.
>
>
> I´m in the National Statistical Office of México and we are doing a proof
> of concept with Spark to see if it is a good solution for this kind of
> problems in my institution, more close to statistical analysis as in R than
> data processing as in Map-Reduce family. I think Spark is going to mature
> in the near future in the mlib direction, with more support to vectorized
> operations “like” R, thinking in spark as the R for BigData.
>
>
> Ok, thank you Imran let me do some comments point by point
>
> 1) 1) Yes you are right, we are getting a feel of spark with a small
> data set. Once the spark implementation its stable enough we are going to
> increment the size of the data set and the size of the cluster. But 30 mins
> is a bad smell about my implementation of the algorithm.
>
> 2) 2) Yes I´m trying to understand the Spark way , in R everything
> is a vector and of course anything is clusterized, I see R as a mean to
> communicate algorithms and ideas with the statistical research area, they
> are strong in R and in statistics.
>
> Next, the meaning of the R variables are:
>
> x # vector with 283,469 observations (Double) from which the density
> estimation will be made
>
> x2 # vector of 512 points where the density will be estimated (Double)
>
> w # vector with 283,469 weights one per each observation (Double)
>
> h # the bandwidth 1 number (Double)
>
>
>
> in Scala
>
> id_x // org.apache.spark.rdd.RDD[(java.lang.String, Double)] with
> 283,469 observations. Where the ugly String is the unconverted consecutive
> Integer that helps me to join the vectors id_x and id_w.
>
> x2 // 512 values in scala.collection.immutable.NumericRange[Double], where
> the density will be estimated
>
> id_w // the 283,469 weights with the ugly String Id, corresponding One to
> One with the id in id_x.
>
> h // One Double for the bandwidth
>
>
>
> As you can see there are some places in the scala algorithm were is nedded
> to do element by element operations like in the :
>
> val z = id_x.mapValues(x=>(x2j-x)/h)
>
>
>
> where z is a new vector with the arithmetic transformation element by
> element of the id_x vector that still have the string id. Maybe as Evan
> says in this place we can manually split the vector in JBlas subvectors,
> and run vectorized operations to avoid the implicit iteration element by
> element, but in this moment I don’t realize how to do that.
>
> The other place where the element by element operations are doing is here:
>
> z.mapValues(gau(_)).join(id_w).map(x=>(x._2._1*x._2._2)).sum
>
>
>
> Here happens the most interesting thing in terms of parallelism and where
> the optimizations could be done, but I don’t know how exactly, the idea
> there is that its needed an element by element operation between two big
> vectors in R its simple :
>
> sum(w*gau(z))
>
>
>
> If I understand well, it’s just what Evan recommends via the ZippedRDD.
> And let me use the vectors directly with out the ugly string id.
>
>
>
> 3) 3) Thanks for the idea
>
> 4) 4) May be we are close to finish the Scala-Spark implementation
> and its not necessary to do a pure Scala implementation.
>
> 5) 5) Thank you, we are to improve de R code.
>
>
>
> And finally Evan, could you give some hints about how to pack the vectors
> in jblas DoubleMatrix, I mean how it is possible control the partitioning.
> I appreciate a lot any direction around that.
>
> Thanks Again to the Spark community !!!
>
>
> On Wed, Dec 11, 2013 at 5:24 PM, Evan R. Sparks <[email protected]>wrote:
>
>> Agreed with Imran - without knowing the size/shape of the objects in your
>> program, it's tough to tell where the bottleneck is. Additionally, unless
>> the problem is really big, (in terms of your w and x vectors), it's
>> unlikely that you're going to be CPU bound on the cluster - communication
>> and stragglers become the dominant effect here.
>>
>> I also agree that the join is causing a big bottleneck.
>>
>> Even after eliminating the join and with really big w and x vectors,
>> though, you may still see spark run "slow" compared to your R program. This
>> may be because many of the operations you use in R are vectorized, and with
>> the spark code, you instead seem to be making your vectors a collection of
>> (offset,value) pairs.
>>
>> There are several alternative strategies here:
>> 1) Instead of a join, use a zip() if the two RDDs are the same size and
>> are paritioned identically. This will be much faster than a join.
>> 2) An alternative strategy that could allow you to get vectorized ops in
>> spark while still operating in a distributed fashion is to pack your
>> partitions into something like a jblas DoubleMatrix, one per partition,
>> where each partition is a contiguous block of elements. Then you can apply
>> your gaussian transformation and elementwise multiply, etc. using jblas,
>> which links in blas - a low-level linear algebra library (not unlike what R
>> is using under the covers) for speed.
>>
>> Again - with 2 you could still do it as a zip.
>>
>> For examples of using jblas with spark code, take a look at the mllib
>> codebase in the spark repository on git.
>>
>> - Evan
>>
>>
>> On Wed, Dec 11, 2013 at 4:51 PM, Imran Rashid <[email protected]>wrote:
>>
>>> these are just thoughts off the top of my head:
>>>
>>> 1) if the original R code runs in 3 secs, you are unlikely to be able to
>>> improve that drastically with spark. Yes, spark can run sub-second jobs,
>>> but no matter what, don't expect spark to get you into the < 10 millisecond
>>> range. While spark has much lower overhead than hadoop at running jobs,
>>> there is definitely still significant overhead. Its really meant to speed
>>> up bigger computations. (Still, I understand the desire to start w/
>>> something small to get a feel for it.)
>>>
>>> 2) Spark is doing a *lot* more work the way you have it coded now. I'm
>>> not entirely sure what is going on b/c I can't see what all the variables
>>> mean. But at the very least, the spark code is dealing w/ different data,
>>> since I see Strings are involved in the spark code.
>>>
>>> In the R code, can you tell me the meaning of & size of:
>>> x
>>> x2
>>> w
>>> h
>>>
>>> and in the scala code, similarly for:
>>> x2
>>> id_x
>>> id_w
>>> h
>>>
>>> The join() in spark is the main thing that is really slow -- b/c its
>>> designed for big data sets, it involves serializing data to a file and
>>> reading it back. That is almost certainly what is taking a long time. And
>>> you are doing one join per element in x. Again, without knowing the
>>> meaning of each of those variables, hard for me to say another way of doing
>>> it ... given that the R code doesn't have any strings or joins involved,
>>> you probably don't need them in spark.
>>>
>>> 3) If the problem really is small, maybe it doesn't make sense to use
>>> spark (related to #1). Given the overhead associated w/ each task, you
>>> want them to be non-trivial amounts of work. Eg., maybe a better example
>>> of using spark to speed this example up would be doing lots of kernel
>>> density estimates w/ the different bandwidths, and then choosing the best
>>> one. pseudo-code:
>>>
>>> val data : Array[Double] = ...
>>> val bcData = sc.broadcast(data)
>>> val bandwidths = ... //eg (0 to 1000).map{_ / 1000.0}
>>> def computeEstimateAndGetError(data: Array[Double], bandwidth:Double):
>>> Double = {
>>> // totally normal scala code goes here, not involving spark at all.
>>> compute the density estimate on part of the data, get the error on the rest
>>> of the data, and return the error
>>> // maybe even do it on a bunch of different samples of data, eg. w/
>>> cross-validation
>>> ...
>>> // just return the error
>>> }
>>>
>>> //now use spark to search all bandwidths
>>> val (bestBandwidth, bestError) = sc.parallelize(bandwidths).map{
>>> bandwidth =>
>>> bandwidth -> computeEstimateAndGetError(bcData.value, bandwidth)
>>> }.collect.minBy{_._2}
>>>
>>> (still, maybe not a great example, since you could do something smarter
>>> than just a grid search, but hopefully that gives you an idea)
>>>
>>> 4) kind of a follow on to #3, but your first step might be to just write
>>> some normal scala code to really compute the exact same thing as R for
>>> comparison.
>>>
>>> 5) The original R code can be improved: You are copying ker a lot,
>>> because you keep appending only one element to it. better to allocate it
>>> as the right size in the first place
>>> ker <- numeric(length(x2))
>>> (depending on the size of x2, this could be a major bottleneck, as it
>>> requires length(x2)^2 operations)
>>>
>>>
>>>
>>>
>>> On Wed, Dec 11, 2013 at 3:26 PM, ABEL ALEJANDRO CORONADO IRUEGAS <
>>> [email protected]> wrote:
>>>
>>>> We are trying to implement a Univariate Kernel Density Estimation in
>>>> Spark, our first step was implement it in R from Scratch, here the relevant
>>>> code:
>>>>
>>>> ## R CODE
>>>> ## gaussian kernel
>>>> gau<-function(z)
>>>> {
>>>> k<- exp(-(z^2)/2)*(1/sqrt(2*pi))
>>>> k
>>>> }
>>>>
>>>> ker<-c()
>>>> for ( j in 1:length(x2))
>>>> { z<-(x2[j]-x)/h
>>>> ker[j]<-sum(w*gau(z))/(sum(w)*h)
>>>> }
>>>>
>>>> Then we implement similar ideas in the Spark Shell
>>>>
>>>> // SCALA-SPARK CODE
>>>>
>>>> def gau(z:Double)={
>>>> scala.math.exp(-scala.math.pow(z,2)/2)*(1/scala.math.sqrt(2*
>>>> scala.math.Pi))
>>>> }
>>>>
>>>> def kernel(x2j:Double,id_x:RDD[(java.lang.String, Double)] ,id_w:
>>>> RDD[(java.lang.String, Double)] ,h:Double) = {
>>>> val z = id_x.mapValues(x=>(x2j-x)/h)
>>>>
>>>> z.mapValues(gau(_)).join(id_w).map(x=>(x._2._1*x._2._2)).sum/(id_w.map(x=>x._2).sum
>>>> * h)
>>>> }
>>>>
>>>> val ker = x2.map(kernel(_,id_x,id_w,h))
>>>>
>>>> The problem is that R is faster (3 sec) than Spark (30 min) in the same
>>>> machine (Quad Core), I´m sure is because we are not using Spark as well as
>>>> its possible.
>>>>
>>>> Can any help us
>>>>
>>>> Abel Coronado
>>>> @abxda
>>>>
>>>
>>>
>>
>