Thank you, i have one year homework ;)

On Thu, Dec 12, 2013 at 2:51 PM, Imran Rashid <[email protected]> wrote:

> ah, got it, makes a lot more sense now.  I couldn't figure out what w was,
> I should have figured it was weights.
>
> As Evan suggested, using zip is almost certainly what you want.
>
> val pointsAndWeights: RDD[(Double,Double)] = ...
>
> zipping together id_x and id_w will give you exactly that, but maybe you
> can just create it directly in the first place, it depends on how you are
> creating id_x & id_w
>
> In some ways, a univariate kernel density estimate isn't the perfect task
> for spark, since every data point effects all of the 512 output points.
> But, since 512 is so small, there is a pretty good workaround.  My next
> suggestion would be to try to make this a "one-pass" data point -- as you
> look at each point, compute its effect on *all* 512 output positions. eg.:
>
>
> val outputPoints = x2.toArray
> val pointToOutputEffect: RDD[Array[Double]] =
> pointsAndWeights.map{case(point, weight) =>
>   val ker = new Array[Double](512)
>   (0 until 512).foreach{idx =>
>     val z = (outputPoints(idx)-point)/h
>     ker(idx) = weight * z
>   }
> }
>
> now we've got an RDD with 283,469 elements in it, each one which is an
> array of size 512.  Now we just want to do an element-wise sum of each of
> those arrays.  But, we want to make sure we do that sum distributed -- we
> don't just want to return all of those arrays to the driver and do the sum
> there.  There are a few different ways to do it:
>
> 1) use fold to combine the results within each partition (distributed),
> and then just add one array per partition on the driver (non-distributed).
>
> val kernel = pointToOutputEffect.fold(new Array[Double](512)){case(left,
> right) => left += right}
>
> (the += method doesn't exist on arrays, you'd have to write it)
>
> 2) use aggregate(), and just add the results from each data point directly
> into your sum array.  this should be much more efficient -- you don't have
> to allocate an array once per data point, which is a huge waste in the
> first approach.
>
> val kernel = pointsAndWeights.aggregate(new Array[Double](512))(
>   {case (ker,(point, weight)) =>
>      //given one point, add it into the result we have so far
>      (0 until 512).foreach{idx =>
>        val z = (outputPoints(idx) - point) / h
>        ker(idx) += weight * z
>      }
>   },
>   {case (left, right) =>
>     //given two results from part of the data, combine them both
>     left += right
>   }
> )
>
>
> 3) use accumulators -- sort of like a variant of aggregate(), but has the
> advantage of letting you accumulate multiple things in one pass.  This one
> is a bit more work -- you need to define some helper classes which say how
> to merge together two Array[Double], it will  be a bit hard to follow so
> I'll leave it out for now :)
>
>
> I'd probably go for #2.  I bet it will get you down to a few seconds :)
>
>
> In all of the approaches I've outlined, there is an important tradeoff you
> have to make of choosing how big each partition is.  If you make lots of
> small partitions, then you can make use of more distributed executors, and
> you'll also get more parallelism if some tasks are bigger, some executors
> are slower, etc.  However, the driver (your "main" program that you use for
> launching spark) will have more work to do to collect all the results
> together and add them all up.  The fewer tasks you use, the less work the
> driver has to do to merge it all together.
>
> The code examples assume sum(w) = 1 -- there is some more bookkeeping you
> need to do if it isn't, hopefully you can add that in.  (Potentially a good
> use of an accumulator.)
>
> Also, I've just been adding doubles together and assuming the numeric
> accuracy woudl be good enough, but you may need to consider something like:
> https://en.wikipedia.org/wiki/Kahan_summation_algorithm
>
> Finally, if you are tired of writing lots of + methods for arrays & for
> kahan summations etc., you might consider using something like algebird:
> https://github.com/twitter/algebird
> but, if you're new to scala, that will probably look like gibberish, don't
> worry about it.  it took me a year with scala before I started to
> appreciate it :)
>
>
>
> On Thu, Dec 12, 2013 at 1:16 PM, ABEL ALEJANDRO CORONADO IRUEGAS <
> [email protected]> wrote:
>
>> First of all, thank you very much, it is a vibrant community.
>>
>> Sorry for being so frugal in my question.
>>
>>
>> The dataset is going to be Bigger, we are processing the Mexican Economic
>> Census Data, and some of the data sets are around millions of records, may
>> be it is not enough to call it Big Data but it is big enough to R and other
>> traditional statistical software.
>>
>>
>> I´m in the National Statistical Office of  México and we are doing a
>> proof of concept with Spark to see if it is a good solution for this kind
>> of problems in my institution, more close to statistical analysis as in R
>> than data processing as in Map-Reduce family. I think Spark is going to
>> mature in the near future in the mlib direction, with more support to
>> vectorized operations “like” R, thinking in spark as the R for BigData.
>>
>>
>> Ok, thank you Imran let me do some comments point by point
>>
>> 1)      1) Yes you are right, we are getting a feel of spark with a
>> small data set. Once the spark implementation its stable enough we are
>> going to increment the size of the data set and the size of the cluster.
>> But 30 mins is a bad smell about my implementation of the algorithm.
>>
>> 2)      2) Yes I´m trying to understand the Spark way , in R everything
>> is a vector and of course anything is clusterized, I see R as a mean to
>> communicate algorithms and ideas with the statistical research area, they
>> are strong in R and in statistics.
>>
>> Next, the meaning of the R variables are:
>>
>> x  # vector with 283,469 observations (Double) from which the density
>> estimation will be made
>>
>> x2 # vector of 512 points where the density will be estimated (Double)
>>
>> w # vector with 283,469 weights one per each observation  (Double)
>>
>> h # the bandwidth 1 number (Double)
>>
>>
>>
>> in Scala
>>
>> id_x  // org.apache.spark.rdd.RDD[(java.lang.String, Double)] with
>> 283,469 observations. Where the ugly String is the unconverted consecutive
>> Integer that helps me to join the vectors id_x and id_w.
>>
>> x2 // 512 values in scala.collection.immutable.NumericRange[Double],
>> where the density will be estimated
>>
>> id_w // the 283,469 weights  with the ugly String Id, corresponding One
>> to One with the id in id_x.
>>
>> h // One Double for the bandwidth
>>
>>
>>
>> As you can see there are some places in the scala algorithm were is
>> nedded to do element by element operations like in the :
>>
>>         val z = id_x.mapValues(x=>(x2j-x)/h)
>>
>>
>>
>> where z is a new vector with the arithmetic transformation element by
>> element of the id_x vector that still have the string id. Maybe as Evan
>> says in this place we can manually split the vector in JBlas subvectors,
>> and run vectorized operations to avoid the implicit iteration element by
>> element, but in this moment I don’t realize how to do that.
>>
>> The other place where the element by element operations are doing is here:
>>
>>         z.mapValues(gau(_)).join(id_w).map(x=>(x._2._1*x._2._2)).sum
>>
>>
>>
>> Here happens the most interesting thing in terms of parallelism and where
>> the optimizations could be done, but I don’t know how exactly, the idea
>> there is that its needed an element by element operation between two big
>> vectors in R its simple :
>>
>>         sum(w*gau(z))
>>
>>
>>
>> If I understand well, it’s just what Evan recommends via the ZippedRDD.
>> And let me use the vectors directly with out the ugly string id.
>>
>>
>>
>> 3)      3) Thanks for the idea
>>
>> 4)      4) May be we are close to finish the Scala-Spark implementation
>> and its not necessary to do a pure Scala implementation.
>>
>> 5)      5) Thank you, we are to improve de R code.
>>
>>
>>
>> And finally Evan, could you give some hints about how to pack the vectors
>> in jblas DoubleMatrix, I mean how it is possible control the partitioning.
>> I appreciate a lot any direction around that.
>>
>> Thanks Again to the Spark community !!!
>>
>>
>> On Wed, Dec 11, 2013 at 5:24 PM, Evan R. Sparks <[email protected]>wrote:
>>
>>> Agreed with Imran - without knowing the size/shape of the objects in
>>> your program, it's tough to tell where the bottleneck is. Additionally,
>>> unless the problem is really big, (in terms of your w and x vectors), it's
>>> unlikely that you're going to be CPU bound on the cluster - communication
>>> and stragglers become the dominant effect here.
>>>
>>> I also agree that the join is causing a big bottleneck.
>>>
>>> Even after eliminating the join and with really big w and x vectors,
>>> though, you may still see spark run "slow" compared to your R program. This
>>> may be because many of the operations you use in R are vectorized, and with
>>> the spark code, you instead seem to be making your vectors a collection of
>>> (offset,value) pairs.
>>>
>>> There are several alternative strategies here:
>>> 1) Instead of a join, use a zip() if the two RDDs are the same size and
>>> are paritioned identically. This will be much faster than a join.
>>>  2) An alternative strategy that could allow you to get vectorized ops
>>> in spark while still operating in a distributed fashion is to pack your
>>> partitions into something like a jblas DoubleMatrix, one per partition,
>>> where each partition is a contiguous block of elements. Then you can apply
>>> your gaussian transformation and elementwise multiply, etc. using jblas,
>>> which links in blas - a low-level linear algebra library (not unlike what R
>>> is using under the covers) for speed.
>>>
>>> Again - with 2 you could still do it as a zip.
>>>
>>> For examples of using jblas with spark code, take a look at the mllib
>>> codebase in the spark repository on git.
>>>
>>> - Evan
>>>
>>>
>>> On Wed, Dec 11, 2013 at 4:51 PM, Imran Rashid <[email protected]>wrote:
>>>
>>>> these are just thoughts off the top of my head:
>>>>
>>>> 1) if the original R code runs in 3 secs, you are unlikely to be able
>>>> to improve that drastically with spark.  Yes, spark can run sub-second
>>>> jobs, but no matter what, don't expect spark to get you into the < 10
>>>> millisecond range.  While spark has much lower overhead than hadoop at
>>>> running jobs, there is definitely still significant overhead.  Its really
>>>> meant to speed up bigger computations.  (Still, I understand the desire to
>>>> start w/ something small to get a feel for it.)
>>>>
>>>> 2) Spark is doing a *lot* more work the way you have it coded now.  I'm
>>>> not entirely sure what is going on b/c I can't see what all the variables
>>>> mean.  But at the very least, the spark code is dealing w/ different data,
>>>> since I see Strings are involved in the spark code.
>>>>
>>>> In the R code, can you tell me the meaning of & size of:
>>>> x
>>>> x2
>>>> w
>>>> h
>>>>
>>>> and in the scala code, similarly for:
>>>> x2
>>>> id_x
>>>> id_w
>>>> h
>>>>
>>>> The join() in spark is the main thing that is really slow -- b/c its
>>>> designed for big data sets, it involves serializing data to a file and
>>>> reading it back.  That is almost certainly what is taking a long time.  And
>>>> you are doing one join per element in x.  Again, without knowing the
>>>> meaning of each of those variables, hard for me to say another way of doing
>>>> it ... given that the R code doesn't have any strings or joins involved,
>>>> you probably don't need them in spark.
>>>>
>>>> 3) If the problem really is small, maybe it doesn't make sense to use
>>>> spark (related to #1).  Given the overhead associated w/ each task, you
>>>> want them to be non-trivial amounts of work.  Eg., maybe a better example
>>>> of using spark to speed this example up would be doing lots of kernel
>>>> density estimates w/ the different bandwidths, and then choosing the best
>>>> one.  pseudo-code:
>>>>
>>>> val data : Array[Double] = ...
>>>> val bcData = sc.broadcast(data)
>>>> val bandwidths = ... //eg (0 to 1000).map{_ / 1000.0}
>>>> def computeEstimateAndGetError(data: Array[Double], bandwidth:Double):
>>>> Double = {
>>>>   // totally normal scala code goes here, not involving spark at all.
>>>> compute the density estimate on part of the data, get the error on the rest
>>>> of the data, and return the error
>>>>   // maybe even do it on a bunch of different samples of data, eg. w/
>>>> cross-validation
>>>>   ...
>>>>   // just return the error
>>>> }
>>>>
>>>> //now use spark to search all bandwidths
>>>> val (bestBandwidth, bestError) = sc.parallelize(bandwidths).map{
>>>> bandwidth =>
>>>>   bandwidth -> computeEstimateAndGetError(bcData.value, bandwidth)
>>>> }.collect.minBy{_._2}
>>>>
>>>> (still, maybe not a great example, since you could do something smarter
>>>> than just a grid search, but hopefully that gives you an idea)
>>>>
>>>> 4) kind of a follow on to #3, but your first step might be to just
>>>> write some normal scala code to really compute the exact same thing as R
>>>> for comparison.
>>>>
>>>> 5) The original R code can be improved: You are copying ker a lot,
>>>> because you keep appending only one element to it.  better to allocate it
>>>> as the right size in the first place
>>>> ker <- numeric(length(x2))
>>>> (depending on the size of x2, this could be a major bottleneck, as it
>>>> requires length(x2)^2 operations)
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Dec 11, 2013 at 3:26 PM, ABEL ALEJANDRO CORONADO IRUEGAS <
>>>> [email protected]> wrote:
>>>>
>>>>> We are trying to implement a Univariate Kernel Density Estimation in
>>>>> Spark, our first step was implement it in R from Scratch, here the 
>>>>> relevant
>>>>> code:
>>>>>
>>>>> ## R CODE
>>>>> ## gaussian kernel
>>>>> gau<-function(z)
>>>>> {
>>>>>     k<- exp(-(z^2)/2)*(1/sqrt(2*pi))
>>>>>     k
>>>>> }
>>>>>
>>>>> ker<-c()
>>>>> for ( j in 1:length(x2))
>>>>> {  z<-(x2[j]-x)/h
>>>>>    ker[j]<-sum(w*gau(z))/(sum(w)*h)
>>>>> }
>>>>>
>>>>> Then we implement similar ideas in the Spark Shell
>>>>>
>>>>> // SCALA-SPARK CODE
>>>>>
>>>>> def gau(z:Double)={
>>>>>     scala.math.exp(-scala.math.pow(z,2)/2)*(1/scala.math.sqrt(2*
>>>>> scala.math.Pi))
>>>>> }
>>>>>
>>>>> def kernel(x2j:Double,id_x:RDD[(java.lang.String, Double)] ,id_w:
>>>>> RDD[(java.lang.String, Double)] ,h:Double) = {
>>>>>     val z = id_x.mapValues(x=>(x2j-x)/h)
>>>>>
>>>>> z.mapValues(gau(_)).join(id_w).map(x=>(x._2._1*x._2._2)).sum/(id_w.map(x=>x._2).sum
>>>>> * h)
>>>>> }
>>>>>
>>>>> val ker = x2.map(kernel(_,id_x,id_w,h))
>>>>>
>>>>> The problem is that R is faster (3 sec) than Spark (30 min) in the
>>>>> same machine (Quad Core), I´m sure is because we are not using Spark as
>>>>> well as its possible.
>>>>>
>>>>> Can any help us
>>>>>
>>>>> Abel Coronado
>>>>> @abxda
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to