Thanks Daniel and Nicholas for the helpful responses.  I'll go with
coalesce(shuffle = true) and see how things go.


On Wed, Jun 25, 2014 at 8:19 AM, Daniel Siegmann <daniel.siegm...@velos.io>
wrote:

> The behavior you're seeing is by design, and it is VERY IMPORTANT to
> understand why this happens because it can cause unexpected behavior in
> various ways. I learned that the hard way. :-)
>
> Spark collapses multiple transforms into a single "stage" wherever
> possible (presumably for performance). The boundary between stages is a
> shuffle. In your example there's no shuffle, so all transforms are being
> collapsed into a single stage. Since you coalesce at the end into two
> partitions, and there is only one stage, that stage must contain two tasks.
>
> It is important to note that coalesce will not cause a shuffle by default
> (repartition will always cause a shuffle). However, you can force it to
> partition by passing true as a second (optional) parameter, like so:
>
> val rdd4 = rdd3.coalesce(2, true)
>
> Try this in Spark shell and you should see 100 tasks for the first stage
> and 2 tasks for the second.
>
>
>
> On Tue, Jun 24, 2014 at 9:22 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> Ah, here's a better hypothesis. Everything you are doing minus the save() is
>> a transformation, not an action. Since nothing is actually triggered until
>> the save(), Spark may be seeing that the lineage of operations ends with
>> 2 partitions anyway and simplifies accordingly.
>>
>> Two suggestions you can try:
>>
>>    1. Remove the coalesce(2) and concatenate the files post-processing
>>    to get the number of files you want. This will also ensure the save() 
>> operation
>>    can be parallelized fully. I think this is the preferable approach since 
>> it
>>    does not artificially reduce the parallelism of your job at any stage.
>>    2.
>>
>>    Another thing you can try is the following:
>>
>>    val rdd1 = sc.sequenceFile(...)
>>    val rdd2 = rdd1.coalesce(100)
>>
>>    val rdd3 = rdd2.map(...).cache() // cache this RDD
>>    val some_count = rdd3.count() // force the map() to run and materialize 
>> the result
>>
>>    val rdd4 = rdd3.coalesce(2)
>>    val rdd5 = rdd4.saveAsTextFile(...) // want only two output files
>>
>>    rdd3.unpersist()
>>
>>    This should let the map() run 100 tasks in parallel while giving you
>>    only 2 output files. You'll get this at the cost of serializing rdd3 to
>>    memory by running the count().
>>
>> Nick
>>
>>
>>  On Tue, Jun 24, 2014 at 8:47 PM, Alex Boisvert <alex.boisv...@gmail.com>
>> wrote:
>>
>>> For the skeptics :), here's a version you can easily reproduce at home:
>>>
>>> val rdd1 = sc.parallelize(1 to 1000, 100) // force with 100 partitions
>>> val rdd2 = rdd1.coalesce(100)
>>> val rdd3 = rdd2 map { _ + 1000 }
>>> val rdd4 = rdd3.coalesce(2)
>>> rdd4.collect()
>>>
>>> You can see that everything runs as only 2 tasks ... :-/
>>>
>>> 2014-06-25 00:43:20,795 INFO org.apache.spark.SparkContext: Starting
>>> job: collect at <console>:48
>>> 2014-06-25 00:43:20,811 INFO org.apache.spark.scheduler.DAGScheduler:
>>> Got job 0 (collect at <console>:48) with 2 output partitions
>>> (allowLocal=false)
>>> 2014-06-25 00:43:20,812 INFO org.apache.spark.scheduler.DAGScheduler:
>>> Final stage: Stage 0 (collect at <console>:48)
>>> 2014-06-25 00:43:20,812 INFO org.apache.spark.scheduler.DAGScheduler:
>>> Parents of final stage: List()
>>> 2014-06-25 00:43:20,821 INFO org.apache.spark.scheduler.DAGScheduler:
>>> Missing parents: List()
>>> 2014-06-25 00:43:20,827 INFO org.apache.spark.scheduler.DAGScheduler:
>>> Submitting Stage 0 (CoalescedRDD[11] at coalesce at <console>:45), which
>>> has no missing parents
>>> 2014-06-25 00:43:20,898 INFO org.apache.spark.scheduler.DAGScheduler:
>>> Submitting 2 missing tasks from Stage 0 (CoalescedRDD[11] at coalesce at
>>> <console>:45)
>>> 2014-06-25 00:43:20,901 INFO
>>> org.apache.spark.scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2
>>> tasks
>>> 2014-06-25 00:43:20,921 INFO org.apache.spark.scheduler.TaskSetManager:
>>> Starting task 0.0:0 as TID 0 on executor 2: ip-10-226-98-211.ec2.internal
>>> (PROCESS_LOCAL)
>>> 2014-06-25 00:43:20,939 INFO org.apache.spark.scheduler.TaskSetManager:
>>> Serialized task 0.0:0 as 6632 bytes in 16 ms
>>> 2014-06-25 00:43:20,943 INFO org.apache.spark.scheduler.TaskSetManager:
>>> Starting task 0.0:1 as TID 1 on executor 5: ip-10-13-132-153.ec2.internal
>>> (PROCESS_LOCAL)
>>> 2014-06-25 00:43:20,951 INFO org.apache.spark.scheduler.TaskSetManager:
>>> Serialized task 0.0:1 as 6632 bytes in 8 ms
>>> 2014-06-25 00:43:21,605 INFO org.apache.spark.scheduler.TaskSetManager:
>>> Finished TID 0 in 685 ms on ip-10-226-98-211.ec2.internal (progress: 1/2)
>>> 2014-06-25 00:43:21,605 INFO org.apache.spark.scheduler.TaskSetManager:
>>> Finished TID 1 in 662 ms on ip-10-13-132-153.ec2.internal (progress: 2/2)
>>> 2014-06-25 00:43:21,606 INFO org.apache.spark.scheduler.DAGScheduler:
>>> Completed ResultTask(0, 0)
>>> 2014-06-25 00:43:21,607 INFO
>>> org.apache.spark.scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose
>>> tasks have all completed, from pool
>>> 2014-06-25 00:43:21,607 INFO org.apache.spark.scheduler.DAGScheduler:
>>> Completed ResultTask(0, 1)
>>> 2014-06-25 00:43:21,608 INFO org.apache.spark.scheduler.DAGScheduler:
>>> Stage 0 (collect at <console>:48) finished in 0.693 s
>>> 2014-06-25 00:43:21,616 INFO org.apache.spark.SparkContext: Job
>>> finished: collect at <console>:48, took 0.821161249 s
>>> res7: Array[Int] = Array(1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008,
>>> 1009, 1010, 1011, 1012, 1013, 1014, 1015, 1016, 1017, 1018, 1019, 1020,
>>> 1031, 1032, 1033, 1034, 1035, 1036, 1037, 1038, 1039, 1040, 1051, 1052,
>>> 1053, 1054, 1055, 1056, 1057, 1058, 1059, 1060, 1081, 1082, 1083, 1084,
>>> 1085, 1086, 1087, 1088, 1089, 1090, 1101, 1102, 1103, 1104, 1105, 1106,
>>> 1107, 1108, 1109, 1110, 1121, 1122, 1123, 1124, 1125, 1126, 1127, 1128,
>>> 1129, 1130, 1141, 1142, 1143, 1144, 1145, 1146, 1147, 1148, 1149, 1150,
>>> 1161, 1162, 1163, 1164, 1165, 1166, 1167, 1168, 1169, 1170, 1181, 1182,
>>> 1183, 1184, 1185, 1186, 1187, 1188, 1189, 1190, 1201, 1202, 1203, 1204,
>>> 1205, 1206, 1207, 1208, 1209, 1210, 1221, 1222, 1223, 1224, 1225, 1226,
>>> 1227, 1228, 1229, 1230, 1241, 1242, 1243, 1244, 1245, 1246, 1247, 1248,
>>> 1249...
>>>
>>>
>>>
>>>
>>> On Tue, Jun 24, 2014 at 5:39 PM, Alex Boisvert <alex.boisv...@gmail.com>
>>> wrote:
>>>
>>>> Yes.
>>>>
>>>> scala> rawLogs.partitions.size
>>>> res1: Int = 2171
>>>>
>>>>
>>>>
>>>> On Tue, Jun 24, 2014 at 4:00 PM, Mayur Rustagi <mayur.rust...@gmail.com
>>>> > wrote:
>>>>
>>>>> To be clear number of map tasks are determined by number of partitions
>>>>> inside the rdd hence the suggestion by Nicholas.
>>>>>
>>>>> Mayur Rustagi
>>>>> Ph: +1 (760) 203 3257
>>>>> http://www.sigmoidanalytics.com
>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jun 25, 2014 at 4:17 AM, Nicholas Chammas <
>>>>> nicholas.cham...@gmail.com> wrote:
>>>>>
>>>>>> So do you get 2171 as the output for that command? That command
>>>>>> tells you how many partitions your RDD has, so it’s good to first confirm
>>>>>> that rdd1 has as many partitions as you think it has.
>>>>>> ​
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 24, 2014 at 4:22 PM, Alex Boisvert <
>>>>>> alex.boisv...@gmail.com> wrote:
>>>>>>
>>>>>>> It's actually a set of 2171 S3 files, with an average size of about
>>>>>>> 18MB.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 24, 2014 at 1:13 PM, Nicholas Chammas <
>>>>>>> nicholas.cham...@gmail.com> wrote:
>>>>>>>
>>>>>>>> What do you get for rdd1._jrdd.splits().size()? You might think
>>>>>>>> you’re getting > 100 partitions, but it may not be happening.
>>>>>>>> ​
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jun 24, 2014 at 3:50 PM, Alex Boisvert <
>>>>>>>> alex.boisv...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> With the following pseudo-code,
>>>>>>>>>
>>>>>>>>> val rdd1 = sc.sequenceFile(...) // has > 100 partitions
>>>>>>>>> val rdd2 = rdd1.coalesce(100)
>>>>>>>>> val rdd3 = rdd2 map { ... }
>>>>>>>>> val rdd4 = rdd3.coalesce(2)
>>>>>>>>> val rdd5 = rdd4.saveAsTextFile(...) // want only two output files
>>>>>>>>>
>>>>>>>>> I would expect the parallelism of the map() operation to be 100
>>>>>>>>> concurrent tasks, and the parallelism of the save() operation to be 2.
>>>>>>>>>
>>>>>>>>> However, it appears the parallelism of the entire chain is 2 -- I
>>>>>>>>> only see two tasks created for the save() operation and those tasks 
>>>>>>>>> appear
>>>>>>>>> to execute the map() operation as well.
>>>>>>>>>
>>>>>>>>> Assuming what I'm seeing is as-specified (meaning, how things are
>>>>>>>>> meant to be), what's the recommended way to force a parallelism of 
>>>>>>>>> 100 on
>>>>>>>>> the map() operation?
>>>>>>>>>
>>>>>>>>> thanks!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> Daniel Siegmann, Software Developer
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
> E: daniel.siegm...@velos.io W: www.velos.io
>

Reply via email to