The behavior you're seeing is by design, and it is VERY IMPORTANT to
understand why this happens because it can cause unexpected behavior in
various ways. I learned that the hard way. :-)

Spark collapses multiple transforms into a single "stage" wherever possible
(presumably for performance). The boundary between stages is a shuffle. In
your example there's no shuffle, so all transforms are being collapsed into
a single stage. Since you coalesce at the end into two partitions, and
there is only one stage, that stage must contain two tasks.

It is important to note that coalesce will not cause a shuffle by default
(repartition will always cause a shuffle). However, you can force it to
partition by passing true as a second (optional) parameter, like so:

val rdd4 = rdd3.coalesce(2, true)

Try this in Spark shell and you should see 100 tasks for the first stage
and 2 tasks for the second.



On Tue, Jun 24, 2014 at 9:22 PM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> Ah, here's a better hypothesis. Everything you are doing minus the save() is
> a transformation, not an action. Since nothing is actually triggered until
> the save(), Spark may be seeing that the lineage of operations ends with
> 2 partitions anyway and simplifies accordingly.
>
> Two suggestions you can try:
>
>    1. Remove the coalesce(2) and concatenate the files post-processing to
>    get the number of files you want. This will also ensure the save() 
> operation
>    can be parallelized fully. I think this is the preferable approach since it
>    does not artificially reduce the parallelism of your job at any stage.
>    2.
>
>    Another thing you can try is the following:
>
>    val rdd1 = sc.sequenceFile(...)
>    val rdd2 = rdd1.coalesce(100)
>
>    val rdd3 = rdd2.map(...).cache() // cache this RDD
>    val some_count = rdd3.count() // force the map() to run and materialize 
> the result
>
>    val rdd4 = rdd3.coalesce(2)
>    val rdd5 = rdd4.saveAsTextFile(...) // want only two output files
>
>    rdd3.unpersist()
>
>    This should let the map() run 100 tasks in parallel while giving you
>    only 2 output files. You'll get this at the cost of serializing rdd3 to
>    memory by running the count().
>
> Nick
>
>
>  On Tue, Jun 24, 2014 at 8:47 PM, Alex Boisvert <alex.boisv...@gmail.com>
> wrote:
>
>> For the skeptics :), here's a version you can easily reproduce at home:
>>
>> val rdd1 = sc.parallelize(1 to 1000, 100) // force with 100 partitions
>> val rdd2 = rdd1.coalesce(100)
>> val rdd3 = rdd2 map { _ + 1000 }
>> val rdd4 = rdd3.coalesce(2)
>> rdd4.collect()
>>
>> You can see that everything runs as only 2 tasks ... :-/
>>
>> 2014-06-25 00:43:20,795 INFO org.apache.spark.SparkContext: Starting job:
>> collect at <console>:48
>> 2014-06-25 00:43:20,811 INFO org.apache.spark.scheduler.DAGScheduler: Got
>> job 0 (collect at <console>:48) with 2 output partitions (allowLocal=false)
>> 2014-06-25 00:43:20,812 INFO org.apache.spark.scheduler.DAGScheduler:
>> Final stage: Stage 0 (collect at <console>:48)
>> 2014-06-25 00:43:20,812 INFO org.apache.spark.scheduler.DAGScheduler:
>> Parents of final stage: List()
>> 2014-06-25 00:43:20,821 INFO org.apache.spark.scheduler.DAGScheduler:
>> Missing parents: List()
>> 2014-06-25 00:43:20,827 INFO org.apache.spark.scheduler.DAGScheduler:
>> Submitting Stage 0 (CoalescedRDD[11] at coalesce at <console>:45), which
>> has no missing parents
>> 2014-06-25 00:43:20,898 INFO org.apache.spark.scheduler.DAGScheduler:
>> Submitting 2 missing tasks from Stage 0 (CoalescedRDD[11] at coalesce at
>> <console>:45)
>> 2014-06-25 00:43:20,901 INFO
>> org.apache.spark.scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2
>> tasks
>> 2014-06-25 00:43:20,921 INFO org.apache.spark.scheduler.TaskSetManager:
>> Starting task 0.0:0 as TID 0 on executor 2: ip-10-226-98-211.ec2.internal
>> (PROCESS_LOCAL)
>> 2014-06-25 00:43:20,939 INFO org.apache.spark.scheduler.TaskSetManager:
>> Serialized task 0.0:0 as 6632 bytes in 16 ms
>> 2014-06-25 00:43:20,943 INFO org.apache.spark.scheduler.TaskSetManager:
>> Starting task 0.0:1 as TID 1 on executor 5: ip-10-13-132-153.ec2.internal
>> (PROCESS_LOCAL)
>> 2014-06-25 00:43:20,951 INFO org.apache.spark.scheduler.TaskSetManager:
>> Serialized task 0.0:1 as 6632 bytes in 8 ms
>> 2014-06-25 00:43:21,605 INFO org.apache.spark.scheduler.TaskSetManager:
>> Finished TID 0 in 685 ms on ip-10-226-98-211.ec2.internal (progress: 1/2)
>> 2014-06-25 00:43:21,605 INFO org.apache.spark.scheduler.TaskSetManager:
>> Finished TID 1 in 662 ms on ip-10-13-132-153.ec2.internal (progress: 2/2)
>> 2014-06-25 00:43:21,606 INFO org.apache.spark.scheduler.DAGScheduler:
>> Completed ResultTask(0, 0)
>> 2014-06-25 00:43:21,607 INFO
>> org.apache.spark.scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose
>> tasks have all completed, from pool
>> 2014-06-25 00:43:21,607 INFO org.apache.spark.scheduler.DAGScheduler:
>> Completed ResultTask(0, 1)
>> 2014-06-25 00:43:21,608 INFO org.apache.spark.scheduler.DAGScheduler:
>> Stage 0 (collect at <console>:48) finished in 0.693 s
>> 2014-06-25 00:43:21,616 INFO org.apache.spark.SparkContext: Job finished:
>> collect at <console>:48, took 0.821161249 s
>> res7: Array[Int] = Array(1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008,
>> 1009, 1010, 1011, 1012, 1013, 1014, 1015, 1016, 1017, 1018, 1019, 1020,
>> 1031, 1032, 1033, 1034, 1035, 1036, 1037, 1038, 1039, 1040, 1051, 1052,
>> 1053, 1054, 1055, 1056, 1057, 1058, 1059, 1060, 1081, 1082, 1083, 1084,
>> 1085, 1086, 1087, 1088, 1089, 1090, 1101, 1102, 1103, 1104, 1105, 1106,
>> 1107, 1108, 1109, 1110, 1121, 1122, 1123, 1124, 1125, 1126, 1127, 1128,
>> 1129, 1130, 1141, 1142, 1143, 1144, 1145, 1146, 1147, 1148, 1149, 1150,
>> 1161, 1162, 1163, 1164, 1165, 1166, 1167, 1168, 1169, 1170, 1181, 1182,
>> 1183, 1184, 1185, 1186, 1187, 1188, 1189, 1190, 1201, 1202, 1203, 1204,
>> 1205, 1206, 1207, 1208, 1209, 1210, 1221, 1222, 1223, 1224, 1225, 1226,
>> 1227, 1228, 1229, 1230, 1241, 1242, 1243, 1244, 1245, 1246, 1247, 1248,
>> 1249...
>>
>>
>>
>>
>> On Tue, Jun 24, 2014 at 5:39 PM, Alex Boisvert <alex.boisv...@gmail.com>
>> wrote:
>>
>>> Yes.
>>>
>>> scala> rawLogs.partitions.size
>>> res1: Int = 2171
>>>
>>>
>>>
>>> On Tue, Jun 24, 2014 at 4:00 PM, Mayur Rustagi <mayur.rust...@gmail.com>
>>> wrote:
>>>
>>>> To be clear number of map tasks are determined by number of partitions
>>>> inside the rdd hence the suggestion by Nicholas.
>>>>
>>>> Mayur Rustagi
>>>> Ph: +1 (760) 203 3257
>>>> http://www.sigmoidanalytics.com
>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>
>>>>
>>>>
>>>> On Wed, Jun 25, 2014 at 4:17 AM, Nicholas Chammas <
>>>> nicholas.cham...@gmail.com> wrote:
>>>>
>>>>> So do you get 2171 as the output for that command? That command tells
>>>>> you how many partitions your RDD has, so it’s good to first confirm that
>>>>> rdd1 has as many partitions as you think it has.
>>>>> ​
>>>>>
>>>>>
>>>>> On Tue, Jun 24, 2014 at 4:22 PM, Alex Boisvert <
>>>>> alex.boisv...@gmail.com> wrote:
>>>>>
>>>>>> It's actually a set of 2171 S3 files, with an average size of about
>>>>>> 18MB.
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 24, 2014 at 1:13 PM, Nicholas Chammas <
>>>>>> nicholas.cham...@gmail.com> wrote:
>>>>>>
>>>>>>> What do you get for rdd1._jrdd.splits().size()? You might think
>>>>>>> you’re getting > 100 partitions, but it may not be happening.
>>>>>>> ​
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 24, 2014 at 3:50 PM, Alex Boisvert <
>>>>>>> alex.boisv...@gmail.com> wrote:
>>>>>>>
>>>>>>>> With the following pseudo-code,
>>>>>>>>
>>>>>>>> val rdd1 = sc.sequenceFile(...) // has > 100 partitions
>>>>>>>> val rdd2 = rdd1.coalesce(100)
>>>>>>>> val rdd3 = rdd2 map { ... }
>>>>>>>> val rdd4 = rdd3.coalesce(2)
>>>>>>>> val rdd5 = rdd4.saveAsTextFile(...) // want only two output files
>>>>>>>>
>>>>>>>> I would expect the parallelism of the map() operation to be 100
>>>>>>>> concurrent tasks, and the parallelism of the save() operation to be 2.
>>>>>>>>
>>>>>>>> However, it appears the parallelism of the entire chain is 2 -- I
>>>>>>>> only see two tasks created for the save() operation and those tasks 
>>>>>>>> appear
>>>>>>>> to execute the map() operation as well.
>>>>>>>>
>>>>>>>> Assuming what I'm seeing is as-specified (meaning, how things are
>>>>>>>> meant to be), what's the recommended way to force a parallelism of 100 
>>>>>>>> on
>>>>>>>> the map() operation?
>>>>>>>>
>>>>>>>> thanks!
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io

Reply via email to