Hmm… The good part of reduce is that it performs local combining within a single partition automatically, but since you turned each partition into a single-value one, local combining is not applicable, and reduce simply degrades to collect and then perform a skyline over all collected partial results on the driver side.
On Wed, Apr 16, 2014 at 2:43 PM, Eugen Cepoi <cepoi.eu...@gmail.com> wrote: > Yes, the second example does that. It transforms all the points of a > partition into a single element the skyline, thus reduce will run on the > skyline of two partitions and not on single points. > Le 16 avr. 2014 06:47, "Yanzhe Chen" <yanzhe...@gmail.com> a écrit : > > Eugen, >> >> Thanks for your tip and I do want to merge the result of a partition with >> another one but I am still not quite clear how to do it. >> >> Say the original data rdd has 32 partitions and since mapPartitionswon’t >> change the number of partitions, it will remain 32 partitions which >> each contains the partial skyline of points in its partition. Now I want to >> merge those 32 partitions to generate a new skyline. It will be better if I >> can use reduce to merge each two of them (than just collect them in to >> one), but I think simply calling reduce method on the rdd won’t work >> because it reduce the data at the granularity of point rather than the >> partition results (which is the collection of points). So is there a way to >> reduce the data at the granularity of partitions? >> >> Thanks, >> >> Yanzhe >> >> On Wednesday, April 16, 2014 at 2:24 AM, Eugen Cepoi wrote: >> >> It depends on your algorithm but I guess that you probably should use >> reduce (the code probably doesn't compile but it shows you the idea). >> >> val result = data.reduce { case (left, right) => >> skyline(left ++ right) >> } >> >> Or in the case you want to merge the result of a partition with another >> one you could do: >> >> val result = data.mapPartitions { points => >> >> // transforms all the partition into a single >> element, but this may incur some other problems, especially if you use Kryo >> serialization... >> *Seq(skyline*(points.toArray)) >> }.reduce { case (left, right) => >> >> skyline(left ++ right) >> } >> >> >> >> >> 2014-04-15 19:37 GMT+02:00 Cheng Lian <lian.cs....@gmail.com>: >> >> Your Spark solution first reduces partial results into a single >> partition, computes the final result, and then collects to the driver side. >> This involves a shuffle and two waves of network traffic. Instead, you can >> directly collect partial results to the driver and then computes the final >> results on driver side: >> >> val data = sc.textFile(...).map(line => line.split(",").map(_.toDouble))val >> partialResults = data.mapPartitions(points => >> skyline(points.toArray).iterator).collect()val results = >> skyline(partialResults) >> >> On Wed, Apr 16, 2014 at 1:03 AM, Yanzhe Chen <yanzhe...@gmail.com> wrote: >> >> Hi all, >> >> As a previous thread, I am asking how to implement a divide-and-conquer >> algorithm (skyline) in Spark. >> Here is my current solution: >> >> val data = sc.textFile(…).map(line => line.split(“,”).map(_.toDouble)) >> >> val result = data.mapPartitions(points => >> *skyline*(points.toArray).iterator).coalesce(1, >> true) >> .mapPartitions(points => *skyline* >> (points.toArray).iterator).collect() >> >> where skyline is a local algorithm to compute the results: >> >> def *skyline*(points: Array[Point]) : Array[Point] >> >> Basically, I find this implement runs slower than the corresponding >> Hadoop version (the identity map phase plus local skyline for both combine >> and reduce phases). >> >> Below are my questions: >> >> 1. Why this implementation is much slower than the Hadoop one? >> >> I can find two possible reasons: one is the shuffle overhead in coalesce, >> another is calling the toArray and iterator repeatedly when invoking >> local skyline algorithm. But I am not sure which one is true. >> >> I haven’t seen your Hadoop version. But if this assumption is right, the >> above version should help. >> >> >> 2. One observation is that while Hadoop version almost used up all the >> CPU resources during execution, the CPU seems not that hot on Spark. Is >> that a clue to prove that the shuffling might be the real bottleneck? >> >> How many parallel tasks are there when running your Spark code? I doubt >> tasks are queued and run sequentially. >> >> >> 3. Is there any difference between coalesce(1, true) and reparation? It >> seems that both opeartions need shuffling data. What’s the proper >> situations using the coalesce method? >> >> repartition(n) is just an alias of coalesce(n, true), so yes, they both >> involve data shuffling. coalesce can be used to shrink partition number >> when dataset size shrinks dramatically after operations like filter. Say >> you have an RDD containing 1TB of data with 100 partitions, after a >> .filter(...) call, only 20GB data left, then you may want to coalesce to >> 2 partitions rather than 100. >> >> >> 4. More generally, I am trying to implementing some core geometry >> computation operators on Spark (like skyline, convex hull etc). In my >> understanding, since Spark is more capable of handling iterative >> computations on dataset, the above solution apparently doesn’t exploit what >> Spark is good at. Any comments on how to do geometry computations on Spark >> (if it is possible) ? >> >> Although Spark is good at iterative algorithms, it also performs better >> in batch computing due to much lower scheduling overhead and thread level >> parallelism. Theoretically, you can always accelerate your MapReduce job by >> rewriting it in Spark. >> >> >> Thanks for any insight. >> >> Yanzhe >> >> >> >>