Re: Partitions with zero records & variable task times
The article is interesting but doesn't really help. It has only one sentence about data distribution in partitions. How can I diagnose skewed data distribution? How could evenly sized blocks in HDFS lead to skewed data anyway? On 9 Sep 2015 2:29 pm, "Akhil Das" wrote: > This post here has a bit information > http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/ > > Thanks > Best Regards > > On Wed, Sep 9, 2015 at 6:44 AM, mark wrote: > >> As I understand things (maybe naively), my input data are stored in equal >> sized blocks in HDFS, and each block represents a partition within Spark >> when read from HDFS, therefore each block should hold roughly the same >> number of records. >> >> So something is missing in my understanding - what can cause some >> partitions to have zero records and others to have roughly equal sized >> chunks (~50k in this case)? >> >> Before writing a custom partitioner, I would like to understand why has >> the default partitioner failed in my case? >> On 8 Sep 2015 3:00 pm, "Akhil Das" wrote: >> >>> Try using a custom partitioner for the keys so that they will get evenly >>> distributed across tasks >>> >>> Thanks >>> Best Regards >>> >>> On Fri, Sep 4, 2015 at 7:19 PM, mark wrote: >>> I am trying to tune a Spark job and have noticed some strange behavior - tasks in a stage vary in execution time, ranging from 2 seconds to 20 seconds. I assume tasks should all run in roughly the same amount of time in a well tuned job. So I did some investigation - the fast tasks appear to have no records, whilst the slow tasks do. I need help understanding why this is happening. The code in the stage is pretty simple. All it does is: - filters records - maps records to a (key, record) tuple - reduces by key The data are Avro objects stored in Parquet files in 16MB blocks in HDFS. To establish how many records in each partition I added this snippet: val counts = rdd.mapPartitions(iter => { val ctx = TaskContext.get val stageId = ctx.stageId val partId = ctx.partitionId val attemptid = ctx.taskAttemptId() Array(Array(stageId, partId, attemptid, iter.size)).iterator } , true).collect() Which produces the following: 1 1 0 0 1 2 1 50489 1 3 2 0 1 4 3 0 1 5 4 0 1 6 5 53200 1 7 6 0 1 8 7 0 1 9 8 0 1 10 9 56946 1 11 10 0 1 12 11 0 1 13 12 0 1 14 13 59209 1 15 14 0 1 16 15 0 1 17 16 0 1 18 17 50202 1 19 18 0 1 20 19 0 1 21 20 0 1 22 21 54613 1 23 22 0 1 24 23 0 1 25 24 54157 1 26 25 0 1 27 26 0 1 28 27 0 1 29 28 53595 1 30 29 0 1 31 30 0 1 32 31 10750 Looking at the logs, you can see the tasks that contain records have the longest run time: 15/09/03 16:26:36 INFO TaskSetManager: Finished task 25.0 in stage 1.0 (TID 26) in 2782 ms on DG1322 (6/32) 15/09/03 16:26:36 INFO TaskSetManager: Finished task 7.0 in stage 1.0 (TID 8) in 2815 ms on DG1322 (7/32) 15/09/03 16:26:36 INFO TaskSetManager: Finished task 19.0 in stage 1.0 (TID 20) in 2815 ms on DG1322 (8/32) 15/09/03 16:26:36 INFO TaskSetManager: Finished task 23.0 in stage 1.0 (TID 24) in 2840 ms on DG1321 (9/32) 15/09/03 16:26:36 INFO TaskSetManager: Finished task 29.0 in stage 1.0 (TID 30) in 2839 ms on DG1321 (10/32) 15/09/03 16:26:36 INFO TaskSetManager: Finished task 11.0 in stage 1.0 (TID 12) in 2878 ms on DG1321 (11/32) 15/09/03 16:26:36 INFO TaskSetManager: Finished task 30.0 in stage 1.0 (TID 31) in 2870 ms on DG1321 (12/32) 15/09/03 16:26:36 INFO
Re: Partitions with zero records & variable task times
This post here has a bit information http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/ Thanks Best Regards On Wed, Sep 9, 2015 at 6:44 AM, mark wrote: > As I understand things (maybe naively), my input data are stored in equal > sized blocks in HDFS, and each block represents a partition within Spark > when read from HDFS, therefore each block should hold roughly the same > number of records. > > So something is missing in my understanding - what can cause some > partitions to have zero records and others to have roughly equal sized > chunks (~50k in this case)? > > Before writing a custom partitioner, I would like to understand why has > the default partitioner failed in my case? > On 8 Sep 2015 3:00 pm, "Akhil Das" wrote: > >> Try using a custom partitioner for the keys so that they will get evenly >> distributed across tasks >> >> Thanks >> Best Regards >> >> On Fri, Sep 4, 2015 at 7:19 PM, mark wrote: >> >>> I am trying to tune a Spark job and have noticed some strange behavior - >>> tasks in a stage vary in execution time, ranging from 2 seconds to 20 >>> seconds. I assume tasks should all run in roughly the same amount of time >>> in a well tuned job. >>> >>> So I did some investigation - the fast tasks appear to have no records, >>> whilst the slow tasks do. I need help understanding why this is happening. >>> >>> The code in the stage is pretty simple. All it does is: >>> >>> - filters records >>> - maps records to a (key, record) tuple >>> - reduces by key >>> >>> The data are Avro objects stored in Parquet files in 16MB blocks in HDFS. >>> >>> To establish how many records in each partition I added this snippet: >>> >>> val counts = rdd.mapPartitions(iter => { >>> val ctx = TaskContext.get >>> val stageId = ctx.stageId >>> val partId = ctx.partitionId >>> val attemptid = ctx.taskAttemptId() >>> Array(Array(stageId, partId, attemptid, iter.size)).iterator } >>> , true).collect() >>> >>> Which produces the following: >>> >>> 1 1 0 0 >>> 1 2 1 50489 >>> 1 3 2 0 >>> 1 4 3 0 >>> 1 5 4 0 >>> 1 6 5 53200 >>> 1 7 6 0 >>> 1 8 7 0 >>> 1 9 8 0 >>> 1 10 9 56946 >>> 1 11 10 0 >>> 1 12 11 0 >>> 1 13 12 0 >>> 1 14 13 59209 >>> 1 15 14 0 >>> 1 16 15 0 >>> 1 17 16 0 >>> 1 18 17 50202 >>> 1 19 18 0 >>> 1 20 19 0 >>> 1 21 20 0 >>> 1 22 21 54613 >>> 1 23 22 0 >>> 1 24 23 0 >>> 1 25 24 54157 >>> 1 26 25 0 >>> 1 27 26 0 >>> 1 28 27 0 >>> 1 29 28 53595 >>> 1 30 29 0 >>> 1 31 30 0 >>> 1 32 31 10750 >>> >>> >>> Looking at the logs, you can see the tasks that contain records have the >>> longest run time: >>> >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 25.0 in stage 1.0 >>> (TID 26) in 2782 ms on DG1322 (6/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 7.0 in stage 1.0 >>> (TID 8) in 2815 ms on DG1322 (7/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 19.0 in stage 1.0 >>> (TID 20) in 2815 ms on DG1322 (8/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 23.0 in stage 1.0 >>> (TID 24) in 2840 ms on DG1321 (9/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 29.0 in stage 1.0 >>> (TID 30) in 2839 ms on DG1321 (10/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 11.0 in stage 1.0 >>> (TID 12) in 2878 ms on DG1321 (11/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 30.0 in stage 1.0 >>> (TID 31) in 2870 ms on DG1321 (12/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 18.0 in stage 1.0 >>> (TID 19) in 2892 ms on DG1321 (13/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 >>> (TID 1) in 2930 ms on DG1321 (14/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 6.0 in stage 1.0 >>> (TID 7) in 2934 ms on DG1321 (15/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 12.0 in stage 1
Re: Partitions with zero records & variable task times
As I understand things (maybe naively), my input data are stored in equal sized blocks in HDFS, and each block represents a partition within Spark when read from HDFS, therefore each block should hold roughly the same number of records. So something is missing in my understanding - what can cause some partitions to have zero records and others to have roughly equal sized chunks (~50k in this case)? Before writing a custom partitioner, I would like to understand why has the default partitioner failed in my case? On 8 Sep 2015 3:00 pm, "Akhil Das" wrote: > Try using a custom partitioner for the keys so that they will get evenly > distributed across tasks > > Thanks > Best Regards > > On Fri, Sep 4, 2015 at 7:19 PM, mark wrote: > >> I am trying to tune a Spark job and have noticed some strange behavior - >> tasks in a stage vary in execution time, ranging from 2 seconds to 20 >> seconds. I assume tasks should all run in roughly the same amount of time >> in a well tuned job. >> >> So I did some investigation - the fast tasks appear to have no records, >> whilst the slow tasks do. I need help understanding why this is happening. >> >> The code in the stage is pretty simple. All it does is: >> >> - filters records >> - maps records to a (key, record) tuple >> - reduces by key >> >> The data are Avro objects stored in Parquet files in 16MB blocks in HDFS. >> >> To establish how many records in each partition I added this snippet: >> >> val counts = rdd.mapPartitions(iter => { >> val ctx = TaskContext.get >> val stageId = ctx.stageId >> val partId = ctx.partitionId >> val attemptid = ctx.taskAttemptId() >> Array(Array(stageId, partId, attemptid, iter.size)).iterator } >> , true).collect() >> >> Which produces the following: >> >> 1 1 0 0 >> 1 2 1 50489 >> 1 3 2 0 >> 1 4 3 0 >> 1 5 4 0 >> 1 6 5 53200 >> 1 7 6 0 >> 1 8 7 0 >> 1 9 8 0 >> 1 10 9 56946 >> 1 11 10 0 >> 1 12 11 0 >> 1 13 12 0 >> 1 14 13 59209 >> 1 15 14 0 >> 1 16 15 0 >> 1 17 16 0 >> 1 18 17 50202 >> 1 19 18 0 >> 1 20 19 0 >> 1 21 20 0 >> 1 22 21 54613 >> 1 23 22 0 >> 1 24 23 0 >> 1 25 24 54157 >> 1 26 25 0 >> 1 27 26 0 >> 1 28 27 0 >> 1 29 28 53595 >> 1 30 29 0 >> 1 31 30 0 >> 1 32 31 10750 >> >> >> Looking at the logs, you can see the tasks that contain records have the >> longest run time: >> >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 25.0 in stage 1.0 >> (TID 26) in 2782 ms on DG1322 (6/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 7.0 in stage 1.0 >> (TID 8) in 2815 ms on DG1322 (7/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 19.0 in stage 1.0 >> (TID 20) in 2815 ms on DG1322 (8/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 23.0 in stage 1.0 >> (TID 24) in 2840 ms on DG1321 (9/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 29.0 in stage 1.0 >> (TID 30) in 2839 ms on DG1321 (10/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 11.0 in stage 1.0 >> (TID 12) in 2878 ms on DG1321 (11/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 30.0 in stage 1.0 >> (TID 31) in 2870 ms on DG1321 (12/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 18.0 in stage 1.0 >> (TID 19) in 2892 ms on DG1321 (13/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 >> (TID 1) in 2930 ms on DG1321 (14/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 6.0 in stage 1.0 >> (TID 7) in 2934 ms on DG1321 (15/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 12.0 in stage 1.0 >> (TID 13) in 2931 ms on DG1321 (16/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 3.0 in stage 1.0 >> (TID 4) in 3246 ms on DG1323 (17/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 27.0 in stage 1.0 >> (TID 28) in 3226 ms on DG1323 (18/32) >> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 15.0 in stage
Re: Partitions with zero records & variable task times
Try using a custom partitioner for the keys so that they will get evenly distributed across tasks Thanks Best Regards On Fri, Sep 4, 2015 at 7:19 PM, mark wrote: > I am trying to tune a Spark job and have noticed some strange behavior - > tasks in a stage vary in execution time, ranging from 2 seconds to 20 > seconds. I assume tasks should all run in roughly the same amount of time > in a well tuned job. > > So I did some investigation - the fast tasks appear to have no records, > whilst the slow tasks do. I need help understanding why this is happening. > > The code in the stage is pretty simple. All it does is: > > - filters records > - maps records to a (key, record) tuple > - reduces by key > > The data are Avro objects stored in Parquet files in 16MB blocks in HDFS. > > To establish how many records in each partition I added this snippet: > > val counts = rdd.mapPartitions(iter => { > val ctx = TaskContext.get > val stageId = ctx.stageId > val partId = ctx.partitionId > val attemptid = ctx.taskAttemptId() > Array(Array(stageId, partId, attemptid, iter.size)).iterator } > , true).collect() > > Which produces the following: > > 1 1 0 0 > 1 2 1 50489 > 1 3 2 0 > 1 4 3 0 > 1 5 4 0 > 1 6 5 53200 > 1 7 6 0 > 1 8 7 0 > 1 9 8 0 > 1 10 9 56946 > 1 11 10 0 > 1 12 11 0 > 1 13 12 0 > 1 14 13 59209 > 1 15 14 0 > 1 16 15 0 > 1 17 16 0 > 1 18 17 50202 > 1 19 18 0 > 1 20 19 0 > 1 21 20 0 > 1 22 21 54613 > 1 23 22 0 > 1 24 23 0 > 1 25 24 54157 > 1 26 25 0 > 1 27 26 0 > 1 28 27 0 > 1 29 28 53595 > 1 30 29 0 > 1 31 30 0 > 1 32 31 10750 > > > Looking at the logs, you can see the tasks that contain records have the > longest run time: > > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 25.0 in stage 1.0 > (TID 26) in 2782 ms on DG1322 (6/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 7.0 in stage 1.0 (TID > 8) in 2815 ms on DG1322 (7/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 19.0 in stage 1.0 > (TID 20) in 2815 ms on DG1322 (8/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 23.0 in stage 1.0 > (TID 24) in 2840 ms on DG1321 (9/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 29.0 in stage 1.0 > (TID 30) in 2839 ms on DG1321 (10/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 11.0 in stage 1.0 > (TID 12) in 2878 ms on DG1321 (11/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 30.0 in stage 1.0 > (TID 31) in 2870 ms on DG1321 (12/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 18.0 in stage 1.0 > (TID 19) in 2892 ms on DG1321 (13/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID > 1) in 2930 ms on DG1321 (14/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 6.0 in stage 1.0 (TID > 7) in 2934 ms on DG1321 (15/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 12.0 in stage 1.0 > (TID 13) in 2931 ms on DG1321 (16/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID > 4) in 3246 ms on DG1323 (17/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 27.0 in stage 1.0 > (TID 28) in 3226 ms on DG1323 (18/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 15.0 in stage 1.0 > (TID 16) in 3249 ms on DG1323 (19/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 10.0 in stage 1.0 > (TID 11) in 3669 ms on DG1323 (20/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 16.0 in stage 1.0 > (TID 17) in 3666 ms on DG1323 (21/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 22.0 in stage 1.0 > (TID 23) in 3664 ms on DG1323 (22/32) > 15/09/03 16:26:36 INFO TaskSetManager: Finished task 4.0 in stage 1.0 (TID > 5) in 3692 ms on DG1323 (23/32) > *15/09/03 16:26:39 INFO TaskSetManager: Finished task 31.0 in stage 1.0 > (TID 32) in 6668 ms on DG1322 (24/32)* > *15/09/03 16:26:48 INFO TaskSetManager: Finished task 17.0