How can you possibly expect anyone to give you meaningful advice when you
have told us nothing about what 'func' or 'proj' does? All I can tell from
your post is that you are using unusual syntax; but that on its own isn't
sufficient to cause a problem:
scala> val func: Int => Int = i => i + 1
func: Int => Int = <function1>
scala> val rdd = sc.parallelize(List(1, 2, 3, 4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> rdd.map { elem => {func.apply(elem)} }.collect
13/10/21 21:32:48 INFO spark.SparkContext: Starting job: collect at
<console>:17
13/10/21 21:32:48 INFO scheduler.DAGScheduler: Got job 0 (collect at
<console>:17) with 1 output partitions (allowLocal=false)
13/10/21 21:32:48 INFO scheduler.DAGScheduler: Final stage: Stage 0
(collect at <console>:17)
13/10/21 21:32:48 INFO scheduler.DAGScheduler: Parents of final stage:
List()
13/10/21 21:32:48 INFO scheduler.DAGScheduler: Missing parents: List()
13/10/21 21:32:48 INFO scheduler.DAGScheduler: Submitting Stage 0
(MappedRDD[1] at map at <console>:17), which has no missing parents
13/10/21 21:32:48 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
from Stage 0 (MappedRDD[1] at map at <console>:17)
13/10/21 21:32:48 INFO local.LocalTaskSetManager: Size of task 0 is 1848
bytes
13/10/21 21:32:48 INFO executor.Executor: Running task ID 0
13/10/21 21:32:48 INFO executor.Executor: Serialized size of result for 0
is 434
13/10/21 21:32:48 INFO executor.Executor: Sending result for 0 directly to
driver
13/10/21 21:32:48 INFO local.LocalScheduler: Remove TaskSet 0.0 from pool
13/10/21 21:32:48 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
13/10/21 21:32:48 INFO executor.Executor: Finished task ID 0
13/10/21 21:32:48 INFO scheduler.DAGScheduler: Stage 0 (collect at
<console>:17) finished in 0.127 s
13/10/21 21:32:48 INFO spark.SparkContext: Job finished: collect at
<console>:17, took 0.219938 s
res0: Array[Int] = Array(2, 3, 4, 5)
On Mon, Oct 21, 2013 at 9:18 PM, Wisc Forum <[email protected]> wrote:
> Hi, we have tried integrating Spark with our existing code and see some
> issues.
>
> The issue is that when we use the below function (where func is a function
> to process elem)
>
> rdd.map{ elem => {func.apply(elem)} }
>
> in the log, I see the apply function are applied a few times for the same
> element elem instead of one.
>
> When I execute this in a sequential way (see below), everything works just
> fine.
>
> sparkContext.parallelize(rdd.toArray.map(elem => proj.apply(elem)))
>
> (the only reason I used sparkContext.parallelize) in the above line is
> because the method requires returning RDD[MyDataType]
>
> Why this happens? Does the map function requires some special thing for
> the rdd?
>
> Thanks,
> Xiaobing
>