Re: executor delay in Spark

2016-04-27 Thread Mike Hynes
Hi Raghava,

I'm terribly sorry about the end of my last email; that garbled
sentence was garbled because it wasn't meant to exist; I wrote it on
my phone, realized I wouldn't realistically have time to look into
another set of logs deeply enough, and then mistook myself for having
deleted it. Again, I'm very sorry for my error here.

I did peek at your code, though, and think you could try the following:
0. The actions in your main method are many, and it will be hard to
isolate a problem; I would recommend only examing *one* RDD at first,
rather than six.
1. There is a lot of repetition for reading RDDs from textfiles
sequentially; if you put those lines into two methods depending on RDD
type, you will at least have one entry point to work with once you
make a simplified test program.
2. In one part you persist, count, immediately unpersist, and then
count again an RDD.. I'm not acquainted with this idiom, and I don't
understand what that is to achieve. It strikes me suspect for
triggering unusual garbage collection, which would, I think, only
complicate your trace debugging.

I've attached a python script that dumps relevant info from the Spark
JSON logs into a CSV for easier analysis in you language of choice;
hopefully it can aid in finer grained debugging (the headers of the
fields it prints are listed in one of the functions).

Mike

On 4/25/16, Raghava Mutharaju  wrote:
> Mike,
>
> We ran our program with 16, 32 and 64 partitions. The behavior was same as
> before with 8 partitions. It was mixed -- for some RDDs we see an
> all-nothing skew, but for others we see them getting split across the 2
> worker nodes. In some cases, they start with even split and in later
> iterations it goes to all-nothing split. Please find the logs attached.
>
> our program source code:
> https://github.com/raghavam/sparkel/blob/275ecbb901a58592d8a70a8568dd95c839d46ecc/src/main/scala/org/daselab/sparkel/SparkELDAGAnalysis.scala
>
> We put in persist() statements for different RDDs just to check their skew.
>
> @Jeff, setting minRegisteredResourcesRatio did not help. Behavior was same
> as before.
>
> Thank you for your time.
>
> Regards,
> Raghava.
>
>
> On Sun, Apr 24, 2016 at 7:17 PM, Mike Hynes <91m...@gmail.com> wrote:
>
>> Could you change numPartitions to {16, 32, 64} and run your program for
>> each to see how many partitions are allocated to each worker? Let's see
>> if
>> you experience an all-nothing imbalance that way; if so, my guess is that
>> something else is odd in your program logic or spark runtime environment,
>> but if not and your executors all receive at least *some* partitions,
>> then
>> I still wouldn't rule out effects of scheduling delay. It's a simple
>> test,
>> but it could give some insight.
>>
>> Mike
>>
>> his could still be a  scheduling  If only one has *all* partitions,  and
>> email me the log file? (If it's 10+ MB, just the first few thousand lines
>> are fine).
>> On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" 
>> wrote:
>>
>>> Mike, All,
>>>
>>> It turns out that the second time we encountered the uneven-partition
>>> issue is not due to spark-submit. It was resolved with the change in
>>> placement of count().
>>>
>>> Case-1:
>>>
>>> val numPartitions = 8
>>> // read uAxioms from HDFS, use hash partitioner on it and persist it
>>> // read type1Axioms from HDFS, use hash partitioner on it and persist it
>>> currDeltaURule1 = type1Axioms.join(uAxioms)
>>>  .values
>>>  .distinct(numPartitions)
>>>  .partitionBy(hashPartitioner)
>>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>>>
>>>  .persist(StorageLevel.MEMORY_AND_DISK)
>>>.count()
>>>
>>> 
>>>
>>> currDeltaURule1 RDD results in all the data on one node (there are 2
>>> worker nodes). If we move count(), the uneven partition issue is
>>> resolved.
>>>
>>> Case-2:
>>>
>>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>>>
>>>  .persist(StorageLevel.MEMORY_AND_DISK)
>>>
>>>
>>> 
>>>
>>>  -- this rdd depends on currDeltaURule1 and it gets
>>> executed. This resolved the uneven partitioning issue.
>>>
>>> I don't see why the moving of an action to a later part in the code
>>> would
>&

Re: executor delay in Spark

2016-04-24 Thread Mike Hynes
Could you change numPartitions to {16, 32, 64} and run your program for
each to see how many partitions are allocated to each worker? Let's see if
you experience an all-nothing imbalance that way; if so, my guess is that
something else is odd in your program logic or spark runtime environment,
but if not and your executors all receive at least *some* partitions, then
I still wouldn't rule out effects of scheduling delay. It's a simple test,
but it could give some insight.

Mike

his could still be a  scheduling  If only one has *all* partitions,  and
email me the log file? (If it's 10+ MB, just the first few thousand lines
are fine).
On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" 
wrote:

> Mike, All,
>
> It turns out that the second time we encountered the uneven-partition
> issue is not due to spark-submit. It was resolved with the change in
> placement of count().
>
> Case-1:
>
> val numPartitions = 8
> // read uAxioms from HDFS, use hash partitioner on it and persist it
> // read type1Axioms from HDFS, use hash partitioner on it and persist it
> currDeltaURule1 = type1Axioms.join(uAxioms)
>  .values
>  .distinct(numPartitions)
>  .partitionBy(hashPartitioner)
> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>
>  .persist(StorageLevel.MEMORY_AND_DISK)
>.count()
>
> 
>
> currDeltaURule1 RDD results in all the data on one node (there are 2
> worker nodes). If we move count(), the uneven partition issue is resolved.
>
> Case-2:
>
> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>
>  .persist(StorageLevel.MEMORY_AND_DISK)
>
>
> 
>
>  -- this rdd depends on currDeltaURule1 and it gets executed.
> This resolved the uneven partitioning issue.
>
> I don't see why the moving of an action to a later part in the code would
> affect the partitioning. Are there other factors at play here that affect
> the partitioning?
>
> (Inconsistent) uneven partitioning leads to one machine getting over
> burdened (memory and number of tasks). We see a clear improvement in
> runtime when the partitioning is even (happens when count is moved).
>
> Any pointers in figuring out this issue is much appreciated.
>
> Regards,
> Raghava.
>
>
>
>
> On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91m...@gmail.com> wrote:
>
>> Glad to hear that the problem was solvable! I have not seen delays of
>> this type for later stages in jobs run by spark-submit, but I do not think
>> it impossible if your stage has no lineage dependence on other RDDs.
>>
>> I'm CC'ing the dev list to report of other users observing load imbalance
>> caused by unusual initial task scheduling. I don't know of ways to avoid
>> this other than creating a dummy task to synchronize the executors, but
>> hopefully someone from there can suggest other possibilities.
>>
>> Mike
>> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" 
>> wrote:
>>
>>> Mike,
>>>
>>> It turns out the executor delay, as you mentioned, is the cause. After
>>> we introduced a dummy stage, partitioning was working fine. Does this delay
>>> happen during later stages as well? We noticed the same behavior
>>> (partitioning happens on spark-shell but not through spark-submit) at a
>>> later stage also.
>>>
>>> Apart from introducing a dummy stage or running it from spark-shell, is
>>> there any other option to fix this?
>>>
>>> Regards,
>>> Raghava.
>>>
>>>
>>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote:
>>>
>>>> When submitting a job with spark-submit, I've observed delays (up to
>>>> 1--2 seconds) for the executors to respond to the driver in order to
>>>> receive tasks in the first stage. The delay does not persist once the
>>>> executors have been synchronized.
>>>>
>>>> When the tasks are very short, as may be your case (relatively small
>>>> data and a simple map task like you have described), the 8 tasks in
>>>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>>> the second executor won't have responded to the master before the
>>>> first 4 tasks on the first executor have completed.
>>>>
>>>> To see if this is the cause in your particular case, you could try the
>>>> following to confirm:
>>>&g

Re: executor delay in Spark

2016-04-22 Thread Mike Hynes
Glad to hear that the problem was solvable! I have not seen delays of this
type for later stages in jobs run by spark-submit, but I do not think it
impossible if your stage has no lineage dependence on other RDDs.

I'm CC'ing the dev list to report of other users observing load imbalance
caused by unusual initial task scheduling. I don't know of ways to avoid
this other than creating a dummy task to synchronize the executors, but
hopefully someone from there can suggest other possibilities.

Mike
On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" 
wrote:

> Mike,
>
> It turns out the executor delay, as you mentioned, is the cause. After we
> introduced a dummy stage, partitioning was working fine. Does this delay
> happen during later stages as well? We noticed the same behavior
> (partitioning happens on spark-shell but not through spark-submit) at a
> later stage also.
>
> Apart from introducing a dummy stage or running it from spark-shell, is
> there any other option to fix this?
>
> Regards,
> Raghava.
>
>
> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote:
>
>> When submitting a job with spark-submit, I've observed delays (up to
>> 1--2 seconds) for the executors to respond to the driver in order to
>> receive tasks in the first stage. The delay does not persist once the
>> executors have been synchronized.
>>
>> When the tasks are very short, as may be your case (relatively small
>> data and a simple map task like you have described), the 8 tasks in
>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>> the second executor won't have responded to the master before the
>> first 4 tasks on the first executor have completed.
>>
>> To see if this is the cause in your particular case, you could try the
>> following to confirm:
>> 1. Examine the starting times of the tasks alongside their
>> executor
>> 2. Make a "dummy" stage execute before your real stages to
>> synchronize the executors by creating and materializing any random RDD
>> 3. Make the tasks longer, i.e. with some silly computational work.
>>
>> Mike
>>
>>
>> On 4/17/16, Raghava Mutharaju  wrote:
>> > Yes its the same data.
>> >
>> > 1) The number of partitions are the same (8, which is an argument to the
>> > HashPartitioner). In the first case, these partitions are spread across
>> > both the worker nodes. In the second case, all the partitions are on the
>> > same node.
>> > 2) What resources would be of interest here? Scala shell takes the
>> default
>> > parameters since we use "bin/spark-shell --master " to run
>> the
>> > scala-shell. For the scala program, we do set some configuration options
>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>> > serializer.
>> >
>> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB
>> > RAM.1 executor runs on each worker node. Following configuration options
>> > are set for the scala program -- perhaps we should move it to the spark
>> > config file.
>> >
>> > Driver memory and executor memory are set to 12GB
>> > parallelism is set to 8
>> > Kryo serializer is used
>> > Number of retainedJobs and retainedStages has been increased to check
>> them
>> > in the UI.
>> >
>> > What information regarding Spark Context would be of interest here?
>> >
>> > Regards,
>> > Raghava.
>> >
>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar 
>> wrote:
>> >
>> >> If the data file is same then it should have similar distribution of
>> >> keys.
>> >> Few queries-
>> >>
>> >> 1. Did you compare the number of partitions in both the cases?
>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>> >> Program being submitted?
>> >>
>> >> Also, can you please share the details of Spark Context, Environment
>> and
>> >> Executors when you run via Scala program?
>> >>
>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>> >> m.vijayaragh...@gmail.com> wrote:
>> >>
>> >>> Hello All,
>> >>>
>> >>> We are using HashPartitioner in the following way on a 3 node cluster
>> (1
>> >>> master and 2 worker nodes).
>> >>>
>> >>> val u =
>> >>> sc.textFile("hdfs://x.x.x.x:8020/us

Re: strange HashPartitioner behavior in Spark

2016-04-17 Thread Mike Hynes
A HashPartitioner will indeed partition based on the key, but you
cannot know on *which* node that key will appear. Again, the RDD
partitions will not necessarily be distributed evenly across your
nodes because of the greedy scheduling of the first wave of tasks,
particularly if those tasks have durations less than the initial
executor delay. I recommend you look at your logs to verify if this is
happening to you.

Mike

On 4/18/16, Anuj Kumar  wrote:
> Good point Mike +1
>
> On Mon, Apr 18, 2016 at 9:47 AM, Mike Hynes <91m...@gmail.com> wrote:
>
>> When submitting a job with spark-submit, I've observed delays (up to
>> 1--2 seconds) for the executors to respond to the driver in order to
>> receive tasks in the first stage. The delay does not persist once the
>> executors have been synchronized.
>>
>> When the tasks are very short, as may be your case (relatively small
>> data and a simple map task like you have described), the 8 tasks in
>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>> the second executor won't have responded to the master before the
>> first 4 tasks on the first executor have completed.
>>
>> To see if this is the cause in your particular case, you could try the
>> following to confirm:
>> 1. Examine the starting times of the tasks alongside their
>> executor
>> 2. Make a "dummy" stage execute before your real stages to
>> synchronize the executors by creating and materializing any random RDD
>> 3. Make the tasks longer, i.e. with some silly computational
>> work.
>>
>> Mike
>>
>>
>> On 4/17/16, Raghava Mutharaju  wrote:
>> > Yes its the same data.
>> >
>> > 1) The number of partitions are the same (8, which is an argument to
>> > the
>> > HashPartitioner). In the first case, these partitions are spread across
>> > both the worker nodes. In the second case, all the partitions are on
>> > the
>> > same node.
>> > 2) What resources would be of interest here? Scala shell takes the
>> default
>> > parameters since we use "bin/spark-shell --master " to run
>> the
>> > scala-shell. For the scala program, we do set some configuration
>> > options
>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>> > serializer.
>> >
>> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB
>> > RAM.1 executor runs on each worker node. Following configuration
>> > options
>> > are set for the scala program -- perhaps we should move it to the spark
>> > config file.
>> >
>> > Driver memory and executor memory are set to 12GB
>> > parallelism is set to 8
>> > Kryo serializer is used
>> > Number of retainedJobs and retainedStages has been increased to check
>> them
>> > in the UI.
>> >
>> > What information regarding Spark Context would be of interest here?
>> >
>> > Regards,
>> > Raghava.
>> >
>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar 
>> > wrote:
>> >
>> >> If the data file is same then it should have similar distribution of
>> >> keys.
>> >> Few queries-
>> >>
>> >> 1. Did you compare the number of partitions in both the cases?
>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>> >> Program being submitted?
>> >>
>> >> Also, can you please share the details of Spark Context, Environment
>> >> and
>> >> Executors when you run via Scala program?
>> >>
>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>> >> m.vijayaragh...@gmail.com> wrote:
>> >>
>> >>> Hello All,
>> >>>
>> >>> We are using HashPartitioner in the following way on a 3 node cluster
>> (1
>> >>> master and 2 worker nodes).
>> >>>
>> >>> val u =
>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>> >>> (y.toInt,
>> >>> x.toInt) } }).partitionBy(new
>> HashPartitioner(8)).setName("u").persist()
>> >>>
>> >>> u.count()
>> >>>
>> >>> If we run this from the spark shell, the data (52 MB) is split across
>> >>> the
>> >>> two worker nodes. But if we put this in a scala program and run it,
>> then
>> >>> all the data goes to only one node. We have run it multiple times,
>> >>> but
>> >>> this
>> >>> behavior does not change. This seems strange.
>> >>>
>> >>> Is there some problem with the way we use HashPartitioner?
>> >>>
>> >>> Thanks in advance.
>> >>>
>> >>> Regards,
>> >>> Raghava.
>> >>>
>> >>
>> >>
>> >
>> >
>> > --
>> > Regards,
>> > Raghava
>> > http://raghavam.github.io
>> >
>>
>>
>> --
>> Thanks,
>> Mike
>>
>


-- 
Thanks,
Mike

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: strange HashPartitioner behavior in Spark

2016-04-17 Thread Mike Hynes
When submitting a job with spark-submit, I've observed delays (up to
1--2 seconds) for the executors to respond to the driver in order to
receive tasks in the first stage. The delay does not persist once the
executors have been synchronized.

When the tasks are very short, as may be your case (relatively small
data and a simple map task like you have described), the 8 tasks in
your stage may be allocated to only 1 executor in 2 waves of 4, since
the second executor won't have responded to the master before the
first 4 tasks on the first executor have completed.

To see if this is the cause in your particular case, you could try the
following to confirm:
1. Examine the starting times of the tasks alongside their executor
2. Make a "dummy" stage execute before your real stages to
synchronize the executors by creating and materializing any random RDD
3. Make the tasks longer, i.e. with some silly computational work.

Mike


On 4/17/16, Raghava Mutharaju  wrote:
> Yes its the same data.
>
> 1) The number of partitions are the same (8, which is an argument to the
> HashPartitioner). In the first case, these partitions are spread across
> both the worker nodes. In the second case, all the partitions are on the
> same node.
> 2) What resources would be of interest here? Scala shell takes the default
> parameters since we use "bin/spark-shell --master " to run the
> scala-shell. For the scala program, we do set some configuration options
> such as driver memory (12GB), parallelism is set to 8 and we use Kryo
> serializer.
>
> We are running this on Azure D3-v2 machines which have 4 cores and 14GB
> RAM.1 executor runs on each worker node. Following configuration options
> are set for the scala program -- perhaps we should move it to the spark
> config file.
>
> Driver memory and executor memory are set to 12GB
> parallelism is set to 8
> Kryo serializer is used
> Number of retainedJobs and retainedStages has been increased to check them
> in the UI.
>
> What information regarding Spark Context would be of interest here?
>
> Regards,
> Raghava.
>
> On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar  wrote:
>
>> If the data file is same then it should have similar distribution of
>> keys.
>> Few queries-
>>
>> 1. Did you compare the number of partitions in both the cases?
>> 2. Did you compare the resource allocation for Spark Shell vs Scala
>> Program being submitted?
>>
>> Also, can you please share the details of Spark Context, Environment and
>> Executors when you run via Scala program?
>>
>> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>> m.vijayaragh...@gmail.com> wrote:
>>
>>> Hello All,
>>>
>>> We are using HashPartitioner in the following way on a 3 node cluster (1
>>> master and 2 worker nodes).
>>>
>>> val u =
>>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>> Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt,
>>> x.toInt) } }).partitionBy(new HashPartitioner(8)).setName("u").persist()
>>>
>>> u.count()
>>>
>>> If we run this from the spark shell, the data (52 MB) is split across
>>> the
>>> two worker nodes. But if we put this in a scala program and run it, then
>>> all the data goes to only one node. We have run it multiple times, but
>>> this
>>> behavior does not change. This seems strange.
>>>
>>> Is there some problem with the way we use HashPartitioner?
>>>
>>> Thanks in advance.
>>>
>>> Regards,
>>> Raghava.
>>>
>>
>>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>


-- 
Thanks,
Mike

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD Partitions not distributed evenly to executors

2016-04-06 Thread Mike Hynes
mbalance arises due to
the greedy nature of the scheduler when data locality is not a factor?

2a. Why are the task times in plot 1 decreasing so dramatically, but
not in plot 2?
2b. Could the decrease in time be due to just-in-time compilation?
2c. If so, Why would the JIT occur only for the first case with many
partitions when the same amount of computational work is to be done in
both cases?

3. If an RDD is to be created in such a manner (i.e. initialized for,
say, an iterative algorithm, rather than by reading data from disk or
hdfs), what is the best practice to promote good load balancing? My
first idea would be to create the full RDD with 2x as many partitions
but then coalesce it down to half the number of partitions with the
shuffle flag set to true. Would that be reasonable?

Thank you very much for your time, and I very much hope that someone
from the dev community who is familiar with the scheduler may be able
to clarify the above observations and questions.

Thanks,
Mike

P.S. Koert Kuipers: neither spark-defaults.sh setting impacted the
observed behaviour, but thank you kindly for your suggestions.


On 4/5/16, Khaled Ammar  wrote:
> I have a similar experience.
>
> Using 32 machines, I can see than number of tasks (partitions) assigned to
> executors (machines) is not even. Moreover, the distribution change every
> stage (iteration).
>
> I wonder why Spark needs to move partitions around any way, should not the
> scheduler reduce network (and other IO) overhead by reducing such
> relocation.
>
> Thanks,
> -Khaled
>
>
>
>
> On Mon, Apr 4, 2016 at 10:57 PM, Koert Kuipers  wrote:
>
>> can you try:
>> spark.shuffle.reduceLocality.enabled=false
>>
>> On Mon, Apr 4, 2016 at 8:17 PM, Mike Hynes <91m...@gmail.com> wrote:
>>
>>> Dear all,
>>>
>>> Thank you for your responses.
>>>
>>> Michael Slavitch:
>>> > Just to be sure:  Has spark-env.sh and spark-defaults.conf been
>>> correctly propagated to all nodes?  Are they identical?
>>> Yes; these files are stored on a shared memory directory accessible to
>>> all nodes.
>>>
>>> Koert Kuipers:
>>> > we ran into similar issues and it seems related to the new memory
>>> > management. can you try:
>>> > spark.memory.useLegacyMode = true
>>> I reran the exact same code with a restarted cluster using this
>>> modification, and did not observe any difference. The partitioning is
>>> still imbalanced.
>>>
>>> Ted Yu:
>>> > If the changes can be ported over to 1.6.1, do you mind reproducing
>>> > the
>>> issue there ?
>>> Since the spark.memory.useLegacyMode setting did not impact my code
>>> execution, I will have to change the Spark dependency back to earlier
>>> versions to see if the issue persists and get back to you.
>>>
>>> Meanwhile, if anyone else has any other ideas or experience, please let
>>> me know.
>>>
>>> Mike
>>>
>>> On 4/4/16, Koert Kuipers  wrote:
>>> > we ran into similar issues and it seems related to the new memory
>>> > management. can you try:
>>> > spark.memory.useLegacyMode = true
>>> >
>>> > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91m...@gmail.com> wrote:
>>> >
>>> >> [ CC'ing dev list since nearly identical questions have occurred in
>>> >> user list recently w/o resolution;
>>> >> c.f.:
>>> >>
>>> >>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
>>> >>
>>> >>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
>>> >> ]
>>> >>
>>> >> Hello,
>>> >>
>>> >> In short, I'm reporting a problem concerning load imbalance of RDD
>>> >> partitions across a standalone cluster. Though there are 16 cores
>>> >> available per node, certain nodes will have >16 partitions, and some
>>> >> will correspondingly have <16 (and even 0).
>>> >>
>>> >> In more detail: I am running some scalability/performance tests for
>>> >> vector-type operations. The RDDs I'm considering are simple block
>>> >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
>>> >> are generated with a fixed number of elements given by some multiple
>>> >> of the available cores, and subsequently hash-partitioned by their
>>>

Re: RDD Partitions not distributed evenly to executors

2016-04-04 Thread Mike Hynes
Dear all,

Thank you for your responses.

Michael Slavitch:
> Just to be sure:  Has spark-env.sh and spark-defaults.conf been correctly 
> propagated to all nodes?  Are they identical?
Yes; these files are stored on a shared memory directory accessible to
all nodes.

Koert Kuipers:
> we ran into similar issues and it seems related to the new memory
> management. can you try:
> spark.memory.useLegacyMode = true
I reran the exact same code with a restarted cluster using this
modification, and did not observe any difference. The partitioning is
still imbalanced.

Ted Yu:
> If the changes can be ported over to 1.6.1, do you mind reproducing the issue 
> there ?
Since the spark.memory.useLegacyMode setting did not impact my code
execution, I will have to change the Spark dependency back to earlier
versions to see if the issue persists and get back to you.

Meanwhile, if anyone else has any other ideas or experience, please let me know.

Mike

On 4/4/16, Koert Kuipers  wrote:
> we ran into similar issues and it seems related to the new memory
> management. can you try:
> spark.memory.useLegacyMode = true
>
> On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91m...@gmail.com> wrote:
>
>> [ CC'ing dev list since nearly identical questions have occurred in
>> user list recently w/o resolution;
>> c.f.:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
>> ]
>>
>> Hello,
>>
>> In short, I'm reporting a problem concerning load imbalance of RDD
>> partitions across a standalone cluster. Though there are 16 cores
>> available per node, certain nodes will have >16 partitions, and some
>> will correspondingly have <16 (and even 0).
>>
>> In more detail: I am running some scalability/performance tests for
>> vector-type operations. The RDDs I'm considering are simple block
>> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
>> are generated with a fixed number of elements given by some multiple
>> of the available cores, and subsequently hash-partitioned by their
>> integer block index.
>>
>> I have verified that the hash partitioning key distribution, as well
>> as the keys themselves, are both correct; the problem is truly that
>> the partitions are *not* evenly distributed across the nodes.
>>
>> For instance, here is a representative output for some stages and
>> tasks in an iterative program. This is a very simple test with 2
>> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
>> examples stages from the stderr log are stages 7 and 9:
>> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
>> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639
>>
>> When counting the location of the partitions on the compute nodes from
>> the stderr logs, however, you can clearly see the imbalance. Examples
>> lines are:
>> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
>> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
>> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
>> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
>> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
>> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
>>
>> Grep'ing the full set of above lines for each hostname, himrod-?,
>> shows the problem occurs in each stage. Below is the output, where the
>> number of partitions stored on each node is given alongside its
>> hostname as in (himrod-?,num_partitions):
>> Stage 7: (himrod-1,0) (himrod-2,64)
>> Stage 9: (himrod-1,16) (himrod-2,48)
>> Stage 12: (himrod-1,0) (himrod-2,64)
>> Stage 14: (himrod-1,16) (himrod-2,48)
>> The imbalance is also visible when the executor ID is used to count
>> the partitions operated on by executors.
>>
>> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
>> (but the modifications do not touch the scheduler, and are irrelevant
>> for these particular tests). Has something changed radically in 1.6+
>> that would make a previously (<=1.5) correct configuration go haywire?
>> Have new configuration settings been added of which I'm unaware that
>> could lead to this problem?
>>
>> Please let me know if others in the community have observed this, and
>> thank you for your time,
>> Mike
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
Thanks,
Mike

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RDD Partitions not distributed evenly to executors

2016-04-04 Thread Mike Hynes
[ CC'ing dev list since nearly identical questions have occurred in
user list recently w/o resolution;
c.f.:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
]

Hello,

In short, I'm reporting a problem concerning load imbalance of RDD
partitions across a standalone cluster. Though there are 16 cores
available per node, certain nodes will have >16 partitions, and some
will correspondingly have <16 (and even 0).

In more detail: I am running some scalability/performance tests for
vector-type operations. The RDDs I'm considering are simple block
vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
are generated with a fixed number of elements given by some multiple
of the available cores, and subsequently hash-partitioned by their
integer block index.

I have verified that the hash partitioning key distribution, as well
as the keys themselves, are both correct; the problem is truly that
the partitions are *not* evenly distributed across the nodes.

For instance, here is a representative output for some stages and
tasks in an iterative program. This is a very simple test with 2
nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
examples stages from the stderr log are stages 7 and 9:
7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639

When counting the location of the partitions on the compute nodes from
the stderr logs, however, you can clearly see the imbalance. Examples
lines are:
13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&

Grep'ing the full set of above lines for each hostname, himrod-?,
shows the problem occurs in each stage. Below is the output, where the
number of partitions stored on each node is given alongside its
hostname as in (himrod-?,num_partitions):
Stage 7: (himrod-1,0) (himrod-2,64)
Stage 9: (himrod-1,16) (himrod-2,48)
Stage 12: (himrod-1,0) (himrod-2,64)
Stage 14: (himrod-1,16) (himrod-2,48)
The imbalance is also visible when the executor ID is used to count
the partitions operated on by executors.

I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
(but the modifications do not touch the scheduler, and are irrelevant
for these particular tests). Has something changed radically in 1.6+
that would make a previously (<=1.5) correct configuration go haywire?
Have new configuration settings been added of which I'm unaware that
could lead to this problem?

Please let me know if others in the community have observed this, and
thank you for your time,
Mike

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org