Re: Partitions with zero records & variable task times

2015-09-09 Thread mark
The article is interesting but doesn't really help. It has only one
sentence about data distribution in partitions.

How can I diagnose skewed data distribution?

How could evenly sized blocks in HDFS lead to skewed data anyway?
On 9 Sep 2015 2:29 pm, "Akhil Das"  wrote:

> This post here has a bit information
> http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/
>
> Thanks
> Best Regards
>
> On Wed, Sep 9, 2015 at 6:44 AM, mark  wrote:
>
>> As I understand things (maybe naively), my input data are stored in equal
>> sized blocks in HDFS, and each block  represents a partition within Spark
>> when read from HDFS, therefore each block should hold roughly the same
>> number of records.
>>
>> So something is missing in my understanding - what can cause some
>> partitions to have zero records and others to have roughly equal sized
>> chunks (~50k in this case)?
>>
>> Before writing a custom partitioner, I would like to understand why has
>> the default partitioner failed in my case?
>> On 8 Sep 2015 3:00 pm, "Akhil Das"  wrote:
>>
>>> Try using a custom partitioner for the keys so that they will get evenly
>>> distributed across tasks
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Fri, Sep 4, 2015 at 7:19 PM, mark  wrote:
>>>
 I am trying to tune a Spark job and have noticed some strange behavior
 - tasks in a stage vary in execution time, ranging from 2 seconds to 20
 seconds. I assume tasks should all run in roughly the same amount of time
 in a well tuned job.

 So I did some investigation - the fast tasks appear to have no records,
 whilst the slow tasks do. I need help understanding why this is happening.

 The code in the stage is pretty simple. All it does is:

 - filters records
 - maps records to a (key, record) tuple
 - reduces by key

 The data are Avro objects stored in Parquet files in 16MB blocks in
 HDFS.

 To establish how many records in each partition I added this snippet:

 val counts = rdd.mapPartitions(iter => {
   val ctx = TaskContext.get
   val stageId = ctx.stageId
   val partId = ctx.partitionId
   val attemptid = ctx.taskAttemptId()
 Array(Array(stageId, partId, attemptid, iter.size)).iterator }
   , true).collect()

 Which produces the following:

 1  1  0  0
 1  2  1  50489
 1  3  2  0
 1  4  3  0
 1  5  4  0
 1  6  5  53200
 1  7  6  0
 1  8  7  0
 1  9  8  0
 1  10   9  56946
 1  11   10   0
 1  12   11   0
 1  13   12   0
 1  14   13   59209
 1  15   14   0
 1  16   15   0
 1  17   16   0
 1  18   17   50202
 1  19   18   0
 1  20   19   0
 1  21   20   0
 1  22   21   54613
 1  23   22   0
 1  24   23   0
 1  25   24   54157
 1  26   25   0
 1  27   26   0
 1  28   27   0
 1  29   28   53595
 1  30   29   0
 1  31   30   0
 1  32   31   10750


 Looking at the logs, you can see the tasks that contain records have
 the longest run time:

 15/09/03 16:26:36 INFO TaskSetManager: Finished task 25.0 in stage 1.0
 (TID 26) in 2782 ms on DG1322 (6/32)
 15/09/03 16:26:36 INFO TaskSetManager: Finished task 7.0 in stage 1.0
 (TID 8) in 2815 ms on DG1322 (7/32)
 15/09/03 16:26:36 INFO TaskSetManager: Finished task 19.0 in stage 1.0
 (TID 20) in 2815 ms on DG1322 (8/32)
 15/09/03 16:26:36 INFO TaskSetManager: Finished task 23.0 in stage 1.0
 (TID 24) in 2840 ms on DG1321 (9/32)
 15/09/03 16:26:36 INFO TaskSetManager: Finished task 29.0 in stage 1.0
 (TID 30) in 2839 ms on DG1321 (10/32)
 15/09/03 16:26:36 INFO TaskSetManager: Finished task 11.0 in stage 1.0
 (TID 12) in 2878 ms on DG1321 (11/32)
 15/09/03 16:26:36 INFO TaskSetManager: Finished task 30.0 in stage 1.0
 (TID 31) in 2870 ms on DG1321 (12/32)
 15/09/03 16:26:36 INFO 

Re: Partitions with zero records & variable task times

2015-09-08 Thread Akhil Das
This post here has a bit information
http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/

Thanks
Best Regards

On Wed, Sep 9, 2015 at 6:44 AM, mark  wrote:

> As I understand things (maybe naively), my input data are stored in equal
> sized blocks in HDFS, and each block  represents a partition within Spark
> when read from HDFS, therefore each block should hold roughly the same
> number of records.
>
> So something is missing in my understanding - what can cause some
> partitions to have zero records and others to have roughly equal sized
> chunks (~50k in this case)?
>
> Before writing a custom partitioner, I would like to understand why has
> the default partitioner failed in my case?
> On 8 Sep 2015 3:00 pm, "Akhil Das"  wrote:
>
>> Try using a custom partitioner for the keys so that they will get evenly
>> distributed across tasks
>>
>> Thanks
>> Best Regards
>>
>> On Fri, Sep 4, 2015 at 7:19 PM, mark  wrote:
>>
>>> I am trying to tune a Spark job and have noticed some strange behavior -
>>> tasks in a stage vary in execution time, ranging from 2 seconds to 20
>>> seconds. I assume tasks should all run in roughly the same amount of time
>>> in a well tuned job.
>>>
>>> So I did some investigation - the fast tasks appear to have no records,
>>> whilst the slow tasks do. I need help understanding why this is happening.
>>>
>>> The code in the stage is pretty simple. All it does is:
>>>
>>> - filters records
>>> - maps records to a (key, record) tuple
>>> - reduces by key
>>>
>>> The data are Avro objects stored in Parquet files in 16MB blocks in HDFS.
>>>
>>> To establish how many records in each partition I added this snippet:
>>>
>>> val counts = rdd.mapPartitions(iter => {
>>>   val ctx = TaskContext.get
>>>   val stageId = ctx.stageId
>>>   val partId = ctx.partitionId
>>>   val attemptid = ctx.taskAttemptId()
>>> Array(Array(stageId, partId, attemptid, iter.size)).iterator }
>>>   , true).collect()
>>>
>>> Which produces the following:
>>>
>>> 1  1  0  0
>>> 1  2  1  50489
>>> 1  3  2  0
>>> 1  4  3  0
>>> 1  5  4  0
>>> 1  6  5  53200
>>> 1  7  6  0
>>> 1  8  7  0
>>> 1  9  8  0
>>> 1  10   9  56946
>>> 1  11   10   0
>>> 1  12   11   0
>>> 1  13   12   0
>>> 1  14   13   59209
>>> 1  15   14   0
>>> 1  16   15   0
>>> 1  17   16   0
>>> 1  18   17   50202
>>> 1  19   18   0
>>> 1  20   19   0
>>> 1  21   20   0
>>> 1  22   21   54613
>>> 1  23   22   0
>>> 1  24   23   0
>>> 1  25   24   54157
>>> 1  26   25   0
>>> 1  27   26   0
>>> 1  28   27   0
>>> 1  29   28   53595
>>> 1  30   29   0
>>> 1  31   30   0
>>> 1  32   31   10750
>>>
>>>
>>> Looking at the logs, you can see the tasks that contain records have the
>>> longest run time:
>>>
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 25.0 in stage 1.0
>>> (TID 26) in 2782 ms on DG1322 (6/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 7.0 in stage 1.0
>>> (TID 8) in 2815 ms on DG1322 (7/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 19.0 in stage 1.0
>>> (TID 20) in 2815 ms on DG1322 (8/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 23.0 in stage 1.0
>>> (TID 24) in 2840 ms on DG1321 (9/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 29.0 in stage 1.0
>>> (TID 30) in 2839 ms on DG1321 (10/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 11.0 in stage 1.0
>>> (TID 12) in 2878 ms on DG1321 (11/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 30.0 in stage 1.0
>>> (TID 31) in 2870 ms on DG1321 (12/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 18.0 in stage 1.0
>>> (TID 19) in 2892 ms on DG1321 (13/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0
>>> (TID 1) in 2930 ms on DG1321 (14/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 6.0 in stage 1.0
>>> (TID 7) in 2934 ms on DG1321 (15/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 12.0 in stage 1

Re: Partitions with zero records & variable task times

2015-09-08 Thread mark
As I understand things (maybe naively), my input data are stored in equal
sized blocks in HDFS, and each block  represents a partition within Spark
when read from HDFS, therefore each block should hold roughly the same
number of records.

So something is missing in my understanding - what can cause some
partitions to have zero records and others to have roughly equal sized
chunks (~50k in this case)?

Before writing a custom partitioner, I would like to understand why has the
default partitioner failed in my case?
On 8 Sep 2015 3:00 pm, "Akhil Das"  wrote:

> Try using a custom partitioner for the keys so that they will get evenly
> distributed across tasks
>
> Thanks
> Best Regards
>
> On Fri, Sep 4, 2015 at 7:19 PM, mark  wrote:
>
>> I am trying to tune a Spark job and have noticed some strange behavior -
>> tasks in a stage vary in execution time, ranging from 2 seconds to 20
>> seconds. I assume tasks should all run in roughly the same amount of time
>> in a well tuned job.
>>
>> So I did some investigation - the fast tasks appear to have no records,
>> whilst the slow tasks do. I need help understanding why this is happening.
>>
>> The code in the stage is pretty simple. All it does is:
>>
>> - filters records
>> - maps records to a (key, record) tuple
>> - reduces by key
>>
>> The data are Avro objects stored in Parquet files in 16MB blocks in HDFS.
>>
>> To establish how many records in each partition I added this snippet:
>>
>> val counts = rdd.mapPartitions(iter => {
>>   val ctx = TaskContext.get
>>   val stageId = ctx.stageId
>>   val partId = ctx.partitionId
>>   val attemptid = ctx.taskAttemptId()
>> Array(Array(stageId, partId, attemptid, iter.size)).iterator }
>>   , true).collect()
>>
>> Which produces the following:
>>
>> 1  1  0  0
>> 1  2  1  50489
>> 1  3  2  0
>> 1  4  3  0
>> 1  5  4  0
>> 1  6  5  53200
>> 1  7  6  0
>> 1  8  7  0
>> 1  9  8  0
>> 1  10   9  56946
>> 1  11   10   0
>> 1  12   11   0
>> 1  13   12   0
>> 1  14   13   59209
>> 1  15   14   0
>> 1  16   15   0
>> 1  17   16   0
>> 1  18   17   50202
>> 1  19   18   0
>> 1  20   19   0
>> 1  21   20   0
>> 1  22   21   54613
>> 1  23   22   0
>> 1  24   23   0
>> 1  25   24   54157
>> 1  26   25   0
>> 1  27   26   0
>> 1  28   27   0
>> 1  29   28   53595
>> 1  30   29   0
>> 1  31   30   0
>> 1  32   31   10750
>>
>>
>> Looking at the logs, you can see the tasks that contain records have the
>> longest run time:
>>
>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 25.0 in stage 1.0
>> (TID 26) in 2782 ms on DG1322 (6/32)
>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 7.0 in stage 1.0
>> (TID 8) in 2815 ms on DG1322 (7/32)
>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 19.0 in stage 1.0
>> (TID 20) in 2815 ms on DG1322 (8/32)
>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 23.0 in stage 1.0
>> (TID 24) in 2840 ms on DG1321 (9/32)
>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 29.0 in stage 1.0
>> (TID 30) in 2839 ms on DG1321 (10/32)
>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 11.0 in stage 1.0
>> (TID 12) in 2878 ms on DG1321 (11/32)
>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 30.0 in stage 1.0
>> (TID 31) in 2870 ms on DG1321 (12/32)
>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 18.0 in stage 1.0
>> (TID 19) in 2892 ms on DG1321 (13/32)
>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0
>> (TID 1) in 2930 ms on DG1321 (14/32)
>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 6.0 in stage 1.0
>> (TID 7) in 2934 ms on DG1321 (15/32)
>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 12.0 in stage 1.0
>> (TID 13) in 2931 ms on DG1321 (16/32)
>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 3.0 in stage 1.0
>> (TID 4) in 3246 ms on DG1323 (17/32)
>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 27.0 in stage 1.0
>> (TID 28) in 3226 ms on DG1323 (18/32)
>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 15.0 in stage 

Re: Partitions with zero records & variable task times

2015-09-08 Thread Akhil Das
Try using a custom partitioner for the keys so that they will get evenly
distributed across tasks

Thanks
Best Regards

On Fri, Sep 4, 2015 at 7:19 PM, mark  wrote:

> I am trying to tune a Spark job and have noticed some strange behavior -
> tasks in a stage vary in execution time, ranging from 2 seconds to 20
> seconds. I assume tasks should all run in roughly the same amount of time
> in a well tuned job.
>
> So I did some investigation - the fast tasks appear to have no records,
> whilst the slow tasks do. I need help understanding why this is happening.
>
> The code in the stage is pretty simple. All it does is:
>
> - filters records
> - maps records to a (key, record) tuple
> - reduces by key
>
> The data are Avro objects stored in Parquet files in 16MB blocks in HDFS.
>
> To establish how many records in each partition I added this snippet:
>
> val counts = rdd.mapPartitions(iter => {
>   val ctx = TaskContext.get
>   val stageId = ctx.stageId
>   val partId = ctx.partitionId
>   val attemptid = ctx.taskAttemptId()
> Array(Array(stageId, partId, attemptid, iter.size)).iterator }
>   , true).collect()
>
> Which produces the following:
>
> 1  1  0  0
> 1  2  1  50489
> 1  3  2  0
> 1  4  3  0
> 1  5  4  0
> 1  6  5  53200
> 1  7  6  0
> 1  8  7  0
> 1  9  8  0
> 1  10   9  56946
> 1  11   10   0
> 1  12   11   0
> 1  13   12   0
> 1  14   13   59209
> 1  15   14   0
> 1  16   15   0
> 1  17   16   0
> 1  18   17   50202
> 1  19   18   0
> 1  20   19   0
> 1  21   20   0
> 1  22   21   54613
> 1  23   22   0
> 1  24   23   0
> 1  25   24   54157
> 1  26   25   0
> 1  27   26   0
> 1  28   27   0
> 1  29   28   53595
> 1  30   29   0
> 1  31   30   0
> 1  32   31   10750
>
>
> Looking at the logs, you can see the tasks that contain records have the
> longest run time:
>
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 25.0 in stage 1.0
> (TID 26) in 2782 ms on DG1322 (6/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 7.0 in stage 1.0 (TID
> 8) in 2815 ms on DG1322 (7/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 19.0 in stage 1.0
> (TID 20) in 2815 ms on DG1322 (8/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 23.0 in stage 1.0
> (TID 24) in 2840 ms on DG1321 (9/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 29.0 in stage 1.0
> (TID 30) in 2839 ms on DG1321 (10/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 11.0 in stage 1.0
> (TID 12) in 2878 ms on DG1321 (11/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 30.0 in stage 1.0
> (TID 31) in 2870 ms on DG1321 (12/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 18.0 in stage 1.0
> (TID 19) in 2892 ms on DG1321 (13/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID
> 1) in 2930 ms on DG1321 (14/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 6.0 in stage 1.0 (TID
> 7) in 2934 ms on DG1321 (15/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 12.0 in stage 1.0
> (TID 13) in 2931 ms on DG1321 (16/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID
> 4) in 3246 ms on DG1323 (17/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 27.0 in stage 1.0
> (TID 28) in 3226 ms on DG1323 (18/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 15.0 in stage 1.0
> (TID 16) in 3249 ms on DG1323 (19/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 10.0 in stage 1.0
> (TID 11) in 3669 ms on DG1323 (20/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 16.0 in stage 1.0
> (TID 17) in 3666 ms on DG1323 (21/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 22.0 in stage 1.0
> (TID 23) in 3664 ms on DG1323 (22/32)
> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 4.0 in stage 1.0 (TID
> 5) in 3692 ms on DG1323 (23/32)
> *15/09/03 16:26:39 INFO TaskSetManager: Finished task 31.0 in stage 1.0
> (TID 32) in 6668 ms on DG1322 (24/32)*
> *15/09/03 16:26:48 INFO TaskSetManager: Finished task 17.0