Re: RDD Partitions not distributed evenly to executors
Hello All (and Devs in particular), Thank you again for your further responses. Please find a detailed email below which identifies the cause (I believe) of the partition imbalance problem, which occurs in spark 1.5, 1.6, and a 2.0-SNAPSHOT. This is followed by follow-up questions for the dev community with more intimate knowledge of the scheduler so that they may confirm my guess at the cause, and please provide insight at how best to avoid the problem. Attached to this email are Gantt-chart plots which show the task execution over elapsed time in a Spark program. This program was meant to investigate the simplest possible vector operation for block-vector data stored in RDDs of type RDD[(Int,Vector)]. In the Gantt plots, you'll see the tasks shown as horizontal lines along the x axis, which shows elapsed time. The shaded regions represent a single executor such that all tasks managed by a single executor lie in a contiguous shaded region. The executors all managed 16 cores on 4 different compute nodes, and the tasks have been sorted and fit into 16 slots for each executor according their chronological order, as determined by the task information in the event log for the program, such that the y-axis corresponds to essentially the unique core id, ranging from 1 to 64. The numbers running horizontally at the top of these plots is the stage number, as determined by the DAG scheduler. In the program itself, two block vectors, v_1 and v_2, were created and copartitioned, cached, and then added together elementwise through a join operation on their block index keys. Stages 0 and 1 correspond to the map and count operations to create v_1; stages 2 and 3 correspond to the same operations on v_2; and stages 6 through 15 consist of identical count operations to materialize the vector v = v_1 + v_2, formed through a join on v_1 and v_2. The vectors v_1 and v_2 were initialized by first creating the keys using a sc.parallelize{0 to num_blocks - 1} operation, after which the keys were partitioned with a HashPartitioner (note that first a dummy map {k => (k,k)} on the keys was done so that the HashPartitioner could be used; the motivation for this was that, for large block vector RDDs, it was be better to hash partition the keys before generating the data). The size of the vectors is determined as a multiple of a fixed vector block size (size of each sub-block) times the number of partitions, which is itself an integer multiple of the number of cores. Furthermore, each partition has \gamma blocks. So each partition has \gamma blocks; there are \alpha partitions per core, and each block has size 2^16. The first plot, 02_4node_imbalance_spark1.6_run2.pdf, shows a representative run of the block vector addition program for \alpha = 4, \gamma = 4. A well-balanced partitioning would correspond to 4 partitions for core, such that each executor is managing 64 tasks. However, as you can see in stage 0, this does not occur: there is a large imbalance, where cores 46--64 have many more tasks to compute than the others. Observing the order of the task assignment, I believe that what is happening here is that, due to the initial random delay of the executors in responding/receiving master instructions, the driver is assigning more tasks to the executor whose initial wave of tasks finishes first. Since there is *no* data locality in stage 0 to factor into determining on which nodes the computation should occur, my understanding is that the driver will allocate the tasks greedily---hence the initial delay is crucial for allocating partitions evenly across the nodes. Furthermore, note that stage 2 (an identical vector initialization operation to stage 0) is well-balanced, since all of the executors completed tasks at approximately the same time, and hence without data locality being a factor, were assigned new tasks at the same rate. Also, note here that the individual task durations are *decreasing markedly* through stages 6--15 (again, all of which are identical), but that the stages are longer than need be due to the load imbalance of the tasks. The second plot, 02_4node_balance_longer.pdf, shows a second version of this same program. The code is identical, however the commandline input parameters have been changed such that there were 64 partitions (\alpha = 1 partition per core), an identical blocksize of 2^16, but \gamma = 16 blocks per partitions---i.e. fewer yet larger partitions such that the vector is the same size. Here, stage 0 and 2 are both evenly partitioned; since the tasks in these stages are longer than the initial executor delay, no imbalance is created. However, despite the better balance in partitions across the nodes, this program takes *longer* in total elapsed time, and the tasks do not seem to be getting shorter by the same proportion as in the previous test with more partitions. Given the above, I would like to ask the following questions: 1. Is my inference correct that the partition imbalance aris
Re: RDD Partitions not distributed evenly to executors
I have a similar experience. Using 32 machines, I can see than number of tasks (partitions) assigned to executors (machines) is not even. Moreover, the distribution change every stage (iteration). I wonder why Spark needs to move partitions around any way, should not the scheduler reduce network (and other IO) overhead by reducing such relocation. Thanks, -Khaled On Mon, Apr 4, 2016 at 10:57 PM, Koert Kuipers wrote: > can you try: > spark.shuffle.reduceLocality.enabled=false > > On Mon, Apr 4, 2016 at 8:17 PM, Mike Hynes <91m...@gmail.com> wrote: > >> Dear all, >> >> Thank you for your responses. >> >> Michael Slavitch: >> > Just to be sure: Has spark-env.sh and spark-defaults.conf been >> correctly propagated to all nodes? Are they identical? >> Yes; these files are stored on a shared memory directory accessible to >> all nodes. >> >> Koert Kuipers: >> > we ran into similar issues and it seems related to the new memory >> > management. can you try: >> > spark.memory.useLegacyMode = true >> I reran the exact same code with a restarted cluster using this >> modification, and did not observe any difference. The partitioning is >> still imbalanced. >> >> Ted Yu: >> > If the changes can be ported over to 1.6.1, do you mind reproducing the >> issue there ? >> Since the spark.memory.useLegacyMode setting did not impact my code >> execution, I will have to change the Spark dependency back to earlier >> versions to see if the issue persists and get back to you. >> >> Meanwhile, if anyone else has any other ideas or experience, please let >> me know. >> >> Mike >> >> On 4/4/16, Koert Kuipers wrote: >> > we ran into similar issues and it seems related to the new memory >> > management. can you try: >> > spark.memory.useLegacyMode = true >> > >> > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91m...@gmail.com> wrote: >> > >> >> [ CC'ing dev list since nearly identical questions have occurred in >> >> user list recently w/o resolution; >> >> c.f.: >> >> >> >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html >> >> >> >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html >> >> ] >> >> >> >> Hello, >> >> >> >> In short, I'm reporting a problem concerning load imbalance of RDD >> >> partitions across a standalone cluster. Though there are 16 cores >> >> available per node, certain nodes will have >16 partitions, and some >> >> will correspondingly have <16 (and even 0). >> >> >> >> In more detail: I am running some scalability/performance tests for >> >> vector-type operations. The RDDs I'm considering are simple block >> >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs >> >> are generated with a fixed number of elements given by some multiple >> >> of the available cores, and subsequently hash-partitioned by their >> >> integer block index. >> >> >> >> I have verified that the hash partitioning key distribution, as well >> >> as the keys themselves, are both correct; the problem is truly that >> >> the partitions are *not* evenly distributed across the nodes. >> >> >> >> For instance, here is a representative output for some stages and >> >> tasks in an iterative program. This is a very simple test with 2 >> >> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two >> >> examples stages from the stderr log are stages 7 and 9: >> >> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272 >> >> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639 >> >> >> >> When counting the location of the partitions on the compute nodes from >> >> the stderr logs, however, you can clearly see the imbalance. Examples >> >> lines are: >> >> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196, >> >> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)& >> >> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197, >> >> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)& >> >> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198, >> >> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)& >> >> >> >> Grep'ing the full set of above lines for each hostname, himrod-?, >> >> shows the problem occurs in each stage. Below is the output, where the >> >> number of partitions stored on each node is given alongside its >> >> hostname as in (himrod-?,num_partitions): >> >> Stage 7: (himrod-1,0) (himrod-2,64) >> >> Stage 9: (himrod-1,16) (himrod-2,48) >> >> Stage 12: (himrod-1,0) (himrod-2,64) >> >> Stage 14: (himrod-1,16) (himrod-2,48) >> >> The imbalance is also visible when the executor ID is used to count >> >> the partitions operated on by executors. >> >> >> >> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch >> >> (but the modifications do not touch the scheduler, and are irrelevant >> >> for these particular tests). Has something changed radically in 1.6+ >> >> that would make a previously (<=1.5) corr
Re: RDD Partitions not distributed evenly to executors
can you try: spark.shuffle.reduceLocality.enabled=false On Mon, Apr 4, 2016 at 8:17 PM, Mike Hynes <91m...@gmail.com> wrote: > Dear all, > > Thank you for your responses. > > Michael Slavitch: > > Just to be sure: Has spark-env.sh and spark-defaults.conf been > correctly propagated to all nodes? Are they identical? > Yes; these files are stored on a shared memory directory accessible to > all nodes. > > Koert Kuipers: > > we ran into similar issues and it seems related to the new memory > > management. can you try: > > spark.memory.useLegacyMode = true > I reran the exact same code with a restarted cluster using this > modification, and did not observe any difference. The partitioning is > still imbalanced. > > Ted Yu: > > If the changes can be ported over to 1.6.1, do you mind reproducing the > issue there ? > Since the spark.memory.useLegacyMode setting did not impact my code > execution, I will have to change the Spark dependency back to earlier > versions to see if the issue persists and get back to you. > > Meanwhile, if anyone else has any other ideas or experience, please let me > know. > > Mike > > On 4/4/16, Koert Kuipers wrote: > > we ran into similar issues and it seems related to the new memory > > management. can you try: > > spark.memory.useLegacyMode = true > > > > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91m...@gmail.com> wrote: > > > >> [ CC'ing dev list since nearly identical questions have occurred in > >> user list recently w/o resolution; > >> c.f.: > >> > >> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html > >> > >> > http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html > >> ] > >> > >> Hello, > >> > >> In short, I'm reporting a problem concerning load imbalance of RDD > >> partitions across a standalone cluster. Though there are 16 cores > >> available per node, certain nodes will have >16 partitions, and some > >> will correspondingly have <16 (and even 0). > >> > >> In more detail: I am running some scalability/performance tests for > >> vector-type operations. The RDDs I'm considering are simple block > >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs > >> are generated with a fixed number of elements given by some multiple > >> of the available cores, and subsequently hash-partitioned by their > >> integer block index. > >> > >> I have verified that the hash partitioning key distribution, as well > >> as the keys themselves, are both correct; the problem is truly that > >> the partitions are *not* evenly distributed across the nodes. > >> > >> For instance, here is a representative output for some stages and > >> tasks in an iterative program. This is a very simple test with 2 > >> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two > >> examples stages from the stderr log are stages 7 and 9: > >> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272 > >> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639 > >> > >> When counting the location of the partitions on the compute nodes from > >> the stderr logs, however, you can clearly see the imbalance. Examples > >> lines are: > >> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196, > >> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)& > >> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197, > >> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)& > >> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198, > >> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)& > >> > >> Grep'ing the full set of above lines for each hostname, himrod-?, > >> shows the problem occurs in each stage. Below is the output, where the > >> number of partitions stored on each node is given alongside its > >> hostname as in (himrod-?,num_partitions): > >> Stage 7: (himrod-1,0) (himrod-2,64) > >> Stage 9: (himrod-1,16) (himrod-2,48) > >> Stage 12: (himrod-1,0) (himrod-2,64) > >> Stage 14: (himrod-1,16) (himrod-2,48) > >> The imbalance is also visible when the executor ID is used to count > >> the partitions operated on by executors. > >> > >> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch > >> (but the modifications do not touch the scheduler, and are irrelevant > >> for these particular tests). Has something changed radically in 1.6+ > >> that would make a previously (<=1.5) correct configuration go haywire? > >> Have new configuration settings been added of which I'm unaware that > >> could lead to this problem? > >> > >> Please let me know if others in the community have observed this, and > >> thank you for your time, > >> Mike > >> > >> - > >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >> For additional commands, e-mail: user-h...@spark.apache.org > >> > >> > > > > > -- > Thanks, > Mike >
Re: RDD Partitions not distributed evenly to executors
Dear all, Thank you for your responses. Michael Slavitch: > Just to be sure: Has spark-env.sh and spark-defaults.conf been correctly > propagated to all nodes? Are they identical? Yes; these files are stored on a shared memory directory accessible to all nodes. Koert Kuipers: > we ran into similar issues and it seems related to the new memory > management. can you try: > spark.memory.useLegacyMode = true I reran the exact same code with a restarted cluster using this modification, and did not observe any difference. The partitioning is still imbalanced. Ted Yu: > If the changes can be ported over to 1.6.1, do you mind reproducing the issue > there ? Since the spark.memory.useLegacyMode setting did not impact my code execution, I will have to change the Spark dependency back to earlier versions to see if the issue persists and get back to you. Meanwhile, if anyone else has any other ideas or experience, please let me know. Mike On 4/4/16, Koert Kuipers wrote: > we ran into similar issues and it seems related to the new memory > management. can you try: > spark.memory.useLegacyMode = true > > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91m...@gmail.com> wrote: > >> [ CC'ing dev list since nearly identical questions have occurred in >> user list recently w/o resolution; >> c.f.: >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html >> ] >> >> Hello, >> >> In short, I'm reporting a problem concerning load imbalance of RDD >> partitions across a standalone cluster. Though there are 16 cores >> available per node, certain nodes will have >16 partitions, and some >> will correspondingly have <16 (and even 0). >> >> In more detail: I am running some scalability/performance tests for >> vector-type operations. The RDDs I'm considering are simple block >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs >> are generated with a fixed number of elements given by some multiple >> of the available cores, and subsequently hash-partitioned by their >> integer block index. >> >> I have verified that the hash partitioning key distribution, as well >> as the keys themselves, are both correct; the problem is truly that >> the partitions are *not* evenly distributed across the nodes. >> >> For instance, here is a representative output for some stages and >> tasks in an iterative program. This is a very simple test with 2 >> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two >> examples stages from the stderr log are stages 7 and 9: >> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272 >> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639 >> >> When counting the location of the partitions on the compute nodes from >> the stderr logs, however, you can clearly see the imbalance. Examples >> lines are: >> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196, >> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)& >> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197, >> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)& >> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198, >> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)& >> >> Grep'ing the full set of above lines for each hostname, himrod-?, >> shows the problem occurs in each stage. Below is the output, where the >> number of partitions stored on each node is given alongside its >> hostname as in (himrod-?,num_partitions): >> Stage 7: (himrod-1,0) (himrod-2,64) >> Stage 9: (himrod-1,16) (himrod-2,48) >> Stage 12: (himrod-1,0) (himrod-2,64) >> Stage 14: (himrod-1,16) (himrod-2,48) >> The imbalance is also visible when the executor ID is used to count >> the partitions operated on by executors. >> >> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch >> (but the modifications do not touch the scheduler, and are irrelevant >> for these particular tests). Has something changed radically in 1.6+ >> that would make a previously (<=1.5) correct configuration go haywire? >> Have new configuration settings been added of which I'm unaware that >> could lead to this problem? >> >> Please let me know if others in the community have observed this, and >> thank you for your time, >> Mike >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > -- Thanks, Mike - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: RDD Partitions not distributed evenly to executors
we ran into similar issues and it seems related to the new memory management. can you try: spark.memory.useLegacyMode = true On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91m...@gmail.com> wrote: > [ CC'ing dev list since nearly identical questions have occurred in > user list recently w/o resolution; > c.f.: > > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html > > http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html > ] > > Hello, > > In short, I'm reporting a problem concerning load imbalance of RDD > partitions across a standalone cluster. Though there are 16 cores > available per node, certain nodes will have >16 partitions, and some > will correspondingly have <16 (and even 0). > > In more detail: I am running some scalability/performance tests for > vector-type operations. The RDDs I'm considering are simple block > vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs > are generated with a fixed number of elements given by some multiple > of the available cores, and subsequently hash-partitioned by their > integer block index. > > I have verified that the hash partitioning key distribution, as well > as the keys themselves, are both correct; the problem is truly that > the partitions are *not* evenly distributed across the nodes. > > For instance, here is a representative output for some stages and > tasks in an iterative program. This is a very simple test with 2 > nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two > examples stages from the stderr log are stages 7 and 9: > 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272 > 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639 > > When counting the location of the partitions on the compute nodes from > the stderr logs, however, you can clearly see the imbalance. Examples > lines are: > 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196, > himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)& > 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197, > himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)& > 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198, > himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)& > > Grep'ing the full set of above lines for each hostname, himrod-?, > shows the problem occurs in each stage. Below is the output, where the > number of partitions stored on each node is given alongside its > hostname as in (himrod-?,num_partitions): > Stage 7: (himrod-1,0) (himrod-2,64) > Stage 9: (himrod-1,16) (himrod-2,48) > Stage 12: (himrod-1,0) (himrod-2,64) > Stage 14: (himrod-1,16) (himrod-2,48) > The imbalance is also visible when the executor ID is used to count > the partitions operated on by executors. > > I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch > (but the modifications do not touch the scheduler, and are irrelevant > for these particular tests). Has something changed radically in 1.6+ > that would make a previously (<=1.5) correct configuration go haywire? > Have new configuration settings been added of which I'm unaware that > could lead to this problem? > > Please let me know if others in the community have observed this, and > thank you for your time, > Mike > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: RDD Partitions not distributed evenly to executors
bq. the modifications do not touch the scheduler If the changes can be ported over to 1.6.1, do you mind reproducing the issue there ? I ask because master branch changes very fast. It would be good to narrow the scope where the behavior you observed started showing. On Mon, Apr 4, 2016 at 6:12 AM, Mike Hynes <91m...@gmail.com> wrote: > [ CC'ing dev list since nearly identical questions have occurred in > user list recently w/o resolution; > c.f.: > > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html > > http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html > ] > > Hello, > > In short, I'm reporting a problem concerning load imbalance of RDD > partitions across a standalone cluster. Though there are 16 cores > available per node, certain nodes will have >16 partitions, and some > will correspondingly have <16 (and even 0). > > In more detail: I am running some scalability/performance tests for > vector-type operations. The RDDs I'm considering are simple block > vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs > are generated with a fixed number of elements given by some multiple > of the available cores, and subsequently hash-partitioned by their > integer block index. > > I have verified that the hash partitioning key distribution, as well > as the keys themselves, are both correct; the problem is truly that > the partitions are *not* evenly distributed across the nodes. > > For instance, here is a representative output for some stages and > tasks in an iterative program. This is a very simple test with 2 > nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two > examples stages from the stderr log are stages 7 and 9: > 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272 > 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639 > > When counting the location of the partitions on the compute nodes from > the stderr logs, however, you can clearly see the imbalance. Examples > lines are: > 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196, > himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)& > 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197, > himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)& > 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198, > himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)& > > Grep'ing the full set of above lines for each hostname, himrod-?, > shows the problem occurs in each stage. Below is the output, where the > number of partitions stored on each node is given alongside its > hostname as in (himrod-?,num_partitions): > Stage 7: (himrod-1,0) (himrod-2,64) > Stage 9: (himrod-1,16) (himrod-2,48) > Stage 12: (himrod-1,0) (himrod-2,64) > Stage 14: (himrod-1,16) (himrod-2,48) > The imbalance is also visible when the executor ID is used to count > the partitions operated on by executors. > > I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch > (but the modifications do not touch the scheduler, and are irrelevant > for these particular tests). Has something changed radically in 1.6+ > that would make a previously (<=1.5) correct configuration go haywire? > Have new configuration settings been added of which I'm unaware that > could lead to this problem? > > Please let me know if others in the community have observed this, and > thank you for your time, > Mike > > - > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > For additional commands, e-mail: dev-h...@spark.apache.org > >
Re: RDD Partitions not distributed evenly to executors
Just to be sure: Has spark-env.sh and spark-defaults.conf been correctly propagated to all nodes? Are they identical? > On Apr 4, 2016, at 9:12 AM, Mike Hynes <91m...@gmail.com> wrote: > > [ CC'ing dev list since nearly identical questions have occurred in > user list recently w/o resolution; > c.f.: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html > http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html > ] > > Hello, > > In short, I'm reporting a problem concerning load imbalance of RDD > partitions across a standalone cluster. Though there are 16 cores > available per node, certain nodes will have >16 partitions, and some > will correspondingly have <16 (and even 0). > > In more detail: I am running some scalability/performance tests for > vector-type operations. The RDDs I'm considering are simple block > vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs > are generated with a fixed number of elements given by some multiple > of the available cores, and subsequently hash-partitioned by their > integer block index. > > I have verified that the hash partitioning key distribution, as well > as the keys themselves, are both correct; the problem is truly that > the partitions are *not* evenly distributed across the nodes. > > For instance, here is a representative output for some stages and > tasks in an iterative program. This is a very simple test with 2 > nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two > examples stages from the stderr log are stages 7 and 9: > 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272 > 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639 > > When counting the location of the partitions on the compute nodes from > the stderr logs, however, you can clearly see the imbalance. Examples > lines are: > 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196, > himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)& > 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197, > himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)& > 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198, > himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)& > > Grep'ing the full set of above lines for each hostname, himrod-?, > shows the problem occurs in each stage. Below is the output, where the > number of partitions stored on each node is given alongside its > hostname as in (himrod-?,num_partitions): > Stage 7: (himrod-1,0) (himrod-2,64) > Stage 9: (himrod-1,16) (himrod-2,48) > Stage 12: (himrod-1,0) (himrod-2,64) > Stage 14: (himrod-1,16) (himrod-2,48) > The imbalance is also visible when the executor ID is used to count > the partitions operated on by executors. > > I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch > (but the modifications do not touch the scheduler, and are irrelevant > for these particular tests). Has something changed radically in 1.6+ > that would make a previously (<=1.5) correct configuration go haywire? > Have new configuration settings been added of which I'm unaware that > could lead to this problem? > > Please let me know if others in the community have observed this, and > thank you for your time, > Mike > > - > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > For additional commands, e-mail: dev-h...@spark.apache.org > - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
RDD Partitions not distributed evenly to executors
[ CC'ing dev list since nearly identical questions have occurred in user list recently w/o resolution; c.f.: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html ] Hello, In short, I'm reporting a problem concerning load imbalance of RDD partitions across a standalone cluster. Though there are 16 cores available per node, certain nodes will have >16 partitions, and some will correspondingly have <16 (and even 0). In more detail: I am running some scalability/performance tests for vector-type operations. The RDDs I'm considering are simple block vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs are generated with a fixed number of elements given by some multiple of the available cores, and subsequently hash-partitioned by their integer block index. I have verified that the hash partitioning key distribution, as well as the keys themselves, are both correct; the problem is truly that the partitions are *not* evenly distributed across the nodes. For instance, here is a representative output for some stages and tasks in an iterative program. This is a very simple test with 2 nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two examples stages from the stderr log are stages 7 and 9: 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639 When counting the location of the partitions on the compute nodes from the stderr logs, however, you can clearly see the imbalance. Examples lines are: 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196, himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)& 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197, himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)& 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198, himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)& Grep'ing the full set of above lines for each hostname, himrod-?, shows the problem occurs in each stage. Below is the output, where the number of partitions stored on each node is given alongside its hostname as in (himrod-?,num_partitions): Stage 7: (himrod-1,0) (himrod-2,64) Stage 9: (himrod-1,16) (himrod-2,48) Stage 12: (himrod-1,0) (himrod-2,64) Stage 14: (himrod-1,16) (himrod-2,48) The imbalance is also visible when the executor ID is used to count the partitions operated on by executors. I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch (but the modifications do not touch the scheduler, and are irrelevant for these particular tests). Has something changed radically in 1.6+ that would make a previously (<=1.5) correct configuration go haywire? Have new configuration settings been added of which I'm unaware that could lead to this problem? Please let me know if others in the community have observed this, and thank you for your time, Mike - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org