Hmm… The good part of reduce is that it performs local combining within a
single partition automatically, but since you turned each partition into a
single-value one, local combining is not applicable, and reduce simply
degrades to collect and then perform a skyline over all collected partial
results on the driver side.


On Wed, Apr 16, 2014 at 2:43 PM, Eugen Cepoi <cepoi.eu...@gmail.com> wrote:

> Yes, the second example does that. It transforms all the points of a
> partition into a single element the skyline, thus reduce will run on the
> skyline of two partitions and not on single points.
> Le 16 avr. 2014 06:47, "Yanzhe Chen" <yanzhe...@gmail.com> a écrit :
>
> Eugen,
>>
>> Thanks for your tip and I do want to merge the result of a partition with
>> another one but I am still not quite clear how to do it.
>>
>> Say the original data rdd has 32 partitions and since mapPartitionswon’t 
>> change the number of partitions, it will remain 32 partitions which
>> each contains the partial skyline of points in its partition. Now I want to
>> merge those 32 partitions to generate a new skyline. It will be better if I
>> can use reduce to merge each two of them (than just collect them in to
>> one), but I think simply calling reduce method on the rdd won’t work
>> because it reduce the data at the granularity of point rather than the
>> partition results (which is the collection of points). So is there a way to
>> reduce the data at the granularity of partitions?
>>
>> Thanks,
>>
>> Yanzhe
>>
>> On Wednesday, April 16, 2014 at 2:24 AM, Eugen Cepoi wrote:
>>
>> It depends on your algorithm but I guess that you probably should use
>> reduce (the code probably doesn't compile but it shows you the idea).
>>
>> val result = data.reduce { case (left, right) =>
>>   skyline(left ++ right)
>> }
>>
>> Or in the case you want to merge the result of a partition with another
>> one you could do:
>>
>> val result = data.mapPartitions { points =>
>>
>>                     // transforms all the partition into a single
>> element, but this may incur some other problems, especially if you use Kryo
>> serialization...
>>                     *Seq(skyline*(points.toArray))
>>                  }.reduce { case (left, right) =>
>>
>>                     skyline(left ++ right)
>>                  }
>>
>>
>>
>>
>> 2014-04-15 19:37 GMT+02:00 Cheng Lian <lian.cs....@gmail.com>:
>>
>> Your Spark solution first reduces partial results into a single
>> partition, computes the final result, and then collects to the driver side.
>> This involves a shuffle and two waves of network traffic. Instead, you can
>> directly collect partial results to the driver and then computes the final
>> results on driver side:
>>
>> val data = sc.textFile(...).map(line => line.split(",").map(_.toDouble))val 
>> partialResults = data.mapPartitions(points => 
>> skyline(points.toArray).iterator).collect()val results = 
>> skyline(partialResults)
>>
>> On Wed, Apr 16, 2014 at 1:03 AM, Yanzhe Chen <yanzhe...@gmail.com> wrote:
>>
>>  Hi all,
>>
>> As a previous thread, I am asking how to implement a divide-and-conquer
>> algorithm (skyline) in Spark.
>> Here is my current solution:
>>
>> val data = sc.textFile(…).map(line => line.split(“,”).map(_.toDouble))
>>
>> val result = data.mapPartitions(points => 
>> *skyline*(points.toArray).iterator).coalesce(1,
>> true)
>>                  .mapPartitions(points => *skyline*
>> (points.toArray).iterator).collect()
>>
>> where skyline is a local algorithm to compute the results:
>>
>> def *skyline*(points: Array[Point]) : Array[Point]
>>
>> Basically, I find this implement runs slower than the corresponding
>> Hadoop version (the identity map phase plus local skyline for both combine
>> and reduce phases).
>>
>> Below are my questions:
>>
>> 1. Why this implementation is much slower than the Hadoop one?
>>
>> I can find two possible reasons: one is the shuffle overhead in coalesce,
>> another is calling the toArray and iterator repeatedly when invoking
>> local skyline algorithm. But I am not sure which one is true.
>>
>> I haven’t seen your Hadoop version. But if this assumption is right, the
>> above version should help.
>>
>>
>> 2. One observation is that while Hadoop version almost used up all the
>> CPU resources during execution, the CPU seems not that hot on Spark. Is
>> that a clue to prove that the shuffling might be the real bottleneck?
>>
>> How many parallel tasks are there when running your Spark code? I doubt
>> tasks are queued and run sequentially.
>>
>>
>> 3. Is there any difference between coalesce(1, true) and reparation? It
>> seems that both opeartions need shuffling data. What’s the proper
>> situations using the coalesce method?
>>
>> repartition(n) is just an alias of coalesce(n, true), so yes, they both
>> involve data shuffling. coalesce can be used to shrink partition number
>> when dataset size shrinks dramatically after operations like filter. Say
>> you have an RDD containing 1TB of data with 100 partitions, after a
>> .filter(...) call, only 20GB data left, then you may want to coalesce to
>> 2 partitions rather than 100.
>>
>>
>> 4. More generally, I am trying to implementing some core geometry
>> computation operators on Spark (like skyline, convex hull etc). In my
>> understanding, since Spark is more capable of handling iterative
>> computations on dataset, the above solution apparently doesn’t exploit what
>> Spark is good at. Any comments on how to do geometry computations on Spark
>> (if it is possible) ?
>>
>> Although Spark is good at iterative algorithms, it also performs better
>> in batch computing due to much lower scheduling overhead and thread level
>> parallelism. Theoretically, you can always accelerate your MapReduce job by
>> rewriting it in Spark.
>>
>>
>> Thanks for any insight.
>>
>> Yanzhe
>>
>>
>>
>>

Reply via email to