Re: Hanging tasks in spark 1.2.1 while working with 1.1.1

2015-03-18 Thread Eugen Cepoi
Hey Dimitriy, thanks for sharing your solution.

I have some more updates.

The problem comes out when shuffle is involved. Using coalesce shuffle true
behaves like reduceByKey+smaller num of partitions, except that the whole
save stage hangs. I am not sure yet if it only happens with UnionRDD or
also for cogroup like.

Changing spark.shuffle.blockTransferService to use nio (default pre 1.2)
solves the problem.
So it looks like this problem arises with the new netty based impl.




2015-03-18 1:26 GMT+01:00 Dmitriy Lyubimov dlie...@gmail.com:

 FWIW observed similar behavior in similar situation. Was able to work
 around by forcefully committing one of the rdds right before the union
 into cache, and forcing that by executing take(1). Nothing else ever
 helped.

 Seems like yet-undiscovered 1.2.x thing.

 On Tue, Mar 17, 2015 at 4:21 PM, Eugen Cepoi cepoi.eu...@gmail.com
 wrote:
  Doing the reduceByKey without changing the number of partitions and then
 do
  a coalesce works.
  But the other version still hangs, without any information (while working
  with spark 1.1.1). The previous logs don't seem to be related to what
  happens.
  I don't think this is a memory issue as the GC time remains low and the
  shuffle read is small. My guess is that it might be related to a high
 number
  of initial partitions, but in that case shouldn't it fail for coalesce
  too...?
 
  Does anyone have an idea where to look at to find what the source of the
  problem is?
 
  Thanks,
  Eugen
 
  2015-03-13 19:18 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com:
 
  Hum increased it to 1024 but doesn't help still have the same problem :(
 
  2015-03-13 18:28 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com:
 
  The one by default 0.07 of executor memory. I'll try increasing it and
  post back the result.
 
  Thanks
 
  2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com:
 
  Might be related: what's the value for
  spark.yarn.executor.memoryOverhead ?
 
  See SPARK-6085
 
  Cheers
 
  On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com
  wrote:
 
  Hi,
 
  I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1.
  Strange thing, the exact same code does work (after upgrade) in the
  spark-shell. But this information might be misleading as it works
 with
  1.1.1...
 
 
  The job takes as input two data sets:
   - rdd A of +170gb (with less it is hard to reproduce) and more than
  11K partitions
   - rdd B of +100mb and 32 partitions
 
  I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am
  not sure the executor config is relevant here. Anyway I tried with
 multiple
  small executors with fewer ram and the inverse.
 
 
  The job basically does this:
  A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save
 
  After the flatMap rdd A size is much smaller similar to B.
 
  Configs I used to run this job:
 
  storage.memoryFraction: 0
  shuffle.memoryFraction: 0.5
 
  akka.timeout 500
  akka.frameSize 40
 
  // this one defines also the memory used by yarn master, but not sure
  if it needs to be important
  driver.memory 5g
  excutor.memory 4250m
 
  I have 7 executors with 2 cores.
 
  What happens:
  The job produces two stages: keyBy and save. The keyBy stage runs
 fine
  and produces a shuffle write of ~150mb. The save stage where the
 suffle read
  occurs hangs. Greater the initial dataset is more tasks hang.
 
  I did run it for much larger datasets with same config/cluster but
  without doing the union and it worked fine.
 
  Some more infos and logs:
 
  Amongst 4 nodes 1 finished all his tasks and the running ones are
 on
  the 3 other nodes. But not sure this is a good information (one node
 that
  completed all his work vs the others) as with some smaller dataset I
 manage
  to get only one hanging task.
 
  Here are the last parts of the executor logs that show some timeouts.
 
  An executor from node ip-10-182-98-220
 
  15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6
  remote fetches in 66 ms
  15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in
  connection from /10.181.48.153:56806
  java.io.IOException: Connection timed out
 
 
  An executor from node ip-10-181-103-186
 
  15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6
  remote fetches in 20 ms
  15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in
  connection from /10.182.98.220:38784
  java.io.IOException: Connection timed out
 
  An executor from node ip-10-181-48-153 (all the logs bellow belong
 this
  node)
 
  15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage
  1.0 (TID 13860). 802 bytes result sent to driver
  15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in
  connection from /10.181.103.186:46381
  java.io.IOException: Connection timed out
 
  Followed by many
 
  15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending
  result
 

Re: Hanging tasks in spark 1.2.1 while working with 1.1.1

2015-03-17 Thread Eugen Cepoi
Doing the reduceByKey without changing the number of partitions and then do
a coalesce works.
But the other version still hangs, without any information (while working
with spark 1.1.1). The previous logs don't seem to be related to what
happens.
I don't think this is a memory issue as the GC time remains low and the
shuffle read is small. My guess is that it might be related to a high
number of initial partitions, but in that case shouldn't it fail for
coalesce too...?

Does anyone have an idea where to look at to find what the source of the
problem is?

Thanks,
Eugen

2015-03-13 19:18 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com:

 Hum increased it to 1024 but doesn't help still have the same problem :(

 2015-03-13 18:28 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com:

 The one by default 0.07 of executor memory. I'll try increasing it and
 post back the result.

 Thanks

 2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com:

 Might be related: what's the value
 for spark.yarn.executor.memoryOverhead ?

 See SPARK-6085

 Cheers

 On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com
 wrote:

 Hi,

 I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1.
 Strange thing, the exact same code does work (after upgrade) in the
 spark-shell. But this information might be misleading as it works with
 1.1.1...


 *The job takes as input two data sets:*
  - rdd A of +170gb (with less it is hard to reproduce) and more than
 11K partitions
  - rdd B of +100mb and 32 partitions

 I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am
 not sure the executor config is relevant here. Anyway I tried with multiple
 small executors with fewer ram and the inverse.


 *The job basically does this:*
 A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save

 After the flatMap rdd A size is much smaller similar to B.

 *Configs I used to run this job:*

 storage.memoryFraction: 0
 shuffle.memoryFraction: 0.5

 akka.timeout 500
 akka.frameSize 40

 // this one defines also the memory used by yarn master, but not sure
 if it needs to be important
 driver.memory 5g
 excutor.memory 4250m

 I have 7 executors with 2 cores.

 *What happens:*
 The job produces two stages: keyBy and save. The keyBy stage runs fine
 and produces a shuffle write of ~150mb. The save stage where the suffle
 read occurs hangs. Greater the initial dataset is more tasks hang.

 I did run it for much larger datasets with same config/cluster but
 without doing the union and it worked fine.

 *Some more infos and logs:*

 Amongst 4 nodes 1 finished all his tasks and the running ones are on
 the 3 other nodes. But not sure this is a good information (one node that
 completed all his work vs the others) as with some smaller dataset I manage
 to get only one hanging task.

 Here are the last parts of the executor logs that show some timeouts.

 *An executor from node ip-10-182-98-220*

 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 
 remote fetches in 66 ms
 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in 
 connection from /10.181.48.153:56806
 java.io.IOException: Connection timed out


 *An executor from node ip-10-181-103-186*

 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 
 remote fetches in 20 ms
 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in 
 connection from /10.182.98.220:38784
 java.io.IOException: Connection timed out

 *An executor from node ip-10-181-48-153* (all the logs bellow belong this 
 node)

 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 
 (TID 13860). 802 bytes result sent to driver
 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in 
 connection from /10.181.103.186:46381
 java.io.IOException: Connection timed out

 *Followed by many *

 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending 
 result 
 ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016, 
 chunkIndex=405}, 
 buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data,
  offset=8631, length=571}} to /10.181.103.186:46381; closing connection
 java.nio.channels.ClosedChannelException

 *with last one being*

 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending 
 result RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to 
 /10.181.103.186:46381; closing connection
 java.nio.channels.ClosedChannelException


 The executors from the node that finished his tasks doesn't show
 anything special.

 Note that I don't cache anything thus reduced the
 storage.memoryFraction to 0.
 I see some of those, but don't think they are related.

 15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B (blocks) + 
 0.0 B (scratch space shared across 0 thread(s)) = 0.0 B. Storage limit = 
 0.0 B.


 Sorry for 

Re: Hanging tasks in spark 1.2.1 while working with 1.1.1

2015-03-17 Thread Dmitriy Lyubimov
FWIW observed similar behavior in similar situation. Was able to work
around by forcefully committing one of the rdds right before the union
into cache, and forcing that by executing take(1). Nothing else ever
helped.

Seems like yet-undiscovered 1.2.x thing.

On Tue, Mar 17, 2015 at 4:21 PM, Eugen Cepoi cepoi.eu...@gmail.com wrote:
 Doing the reduceByKey without changing the number of partitions and then do
 a coalesce works.
 But the other version still hangs, without any information (while working
 with spark 1.1.1). The previous logs don't seem to be related to what
 happens.
 I don't think this is a memory issue as the GC time remains low and the
 shuffle read is small. My guess is that it might be related to a high number
 of initial partitions, but in that case shouldn't it fail for coalesce
 too...?

 Does anyone have an idea where to look at to find what the source of the
 problem is?

 Thanks,
 Eugen

 2015-03-13 19:18 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com:

 Hum increased it to 1024 but doesn't help still have the same problem :(

 2015-03-13 18:28 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com:

 The one by default 0.07 of executor memory. I'll try increasing it and
 post back the result.

 Thanks

 2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com:

 Might be related: what's the value for
 spark.yarn.executor.memoryOverhead ?

 See SPARK-6085

 Cheers

 On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com
 wrote:

 Hi,

 I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1.
 Strange thing, the exact same code does work (after upgrade) in the
 spark-shell. But this information might be misleading as it works with
 1.1.1...


 The job takes as input two data sets:
  - rdd A of +170gb (with less it is hard to reproduce) and more than
 11K partitions
  - rdd B of +100mb and 32 partitions

 I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am
 not sure the executor config is relevant here. Anyway I tried with 
 multiple
 small executors with fewer ram and the inverse.


 The job basically does this:
 A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save

 After the flatMap rdd A size is much smaller similar to B.

 Configs I used to run this job:

 storage.memoryFraction: 0
 shuffle.memoryFraction: 0.5

 akka.timeout 500
 akka.frameSize 40

 // this one defines also the memory used by yarn master, but not sure
 if it needs to be important
 driver.memory 5g
 excutor.memory 4250m

 I have 7 executors with 2 cores.

 What happens:
 The job produces two stages: keyBy and save. The keyBy stage runs fine
 and produces a shuffle write of ~150mb. The save stage where the suffle 
 read
 occurs hangs. Greater the initial dataset is more tasks hang.

 I did run it for much larger datasets with same config/cluster but
 without doing the union and it worked fine.

 Some more infos and logs:

 Amongst 4 nodes 1 finished all his tasks and the running ones are on
 the 3 other nodes. But not sure this is a good information (one node that
 completed all his work vs the others) as with some smaller dataset I 
 manage
 to get only one hanging task.

 Here are the last parts of the executor logs that show some timeouts.

 An executor from node ip-10-182-98-220

 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6
 remote fetches in 66 ms
 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in
 connection from /10.181.48.153:56806
 java.io.IOException: Connection timed out


 An executor from node ip-10-181-103-186

 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6
 remote fetches in 20 ms
 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in
 connection from /10.182.98.220:38784
 java.io.IOException: Connection timed out

 An executor from node ip-10-181-48-153 (all the logs bellow belong this
 node)

 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage
 1.0 (TID 13860). 802 bytes result sent to driver
 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in
 connection from /10.181.103.186:46381
 java.io.IOException: Connection timed out

 Followed by many

 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending
 result 
 ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016,
 chunkIndex=405},
 buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data,
 offset=8631, length=571}} to /10.181.103.186:46381; closing connection
 java.nio.channels.ClosedChannelException

 with last one being

 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending
 result RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to
 /10.181.103.186:46381; closing connection
 java.nio.channels.ClosedChannelException


 The executors from the node that finished his tasks doesn't show
 anything 

Re: Hanging tasks in spark 1.2.1 while working with 1.1.1

2015-03-13 Thread Eugen Cepoi
Hum increased it to 1024 but doesn't help still have the same problem :(

2015-03-13 18:28 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com:

 The one by default 0.07 of executor memory. I'll try increasing it and
 post back the result.

 Thanks

 2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com:

 Might be related: what's the value for spark.yarn.executor.memoryOverhead
 ?

 See SPARK-6085

 Cheers

 On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com
 wrote:

 Hi,

 I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1.
 Strange thing, the exact same code does work (after upgrade) in the
 spark-shell. But this information might be misleading as it works with
 1.1.1...


 *The job takes as input two data sets:*
  - rdd A of +170gb (with less it is hard to reproduce) and more than 11K
 partitions
  - rdd B of +100mb and 32 partitions

 I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am
 not sure the executor config is relevant here. Anyway I tried with multiple
 small executors with fewer ram and the inverse.


 *The job basically does this:*
 A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save

 After the flatMap rdd A size is much smaller similar to B.

 *Configs I used to run this job:*

 storage.memoryFraction: 0
 shuffle.memoryFraction: 0.5

 akka.timeout 500
 akka.frameSize 40

 // this one defines also the memory used by yarn master, but not sure if
 it needs to be important
 driver.memory 5g
 excutor.memory 4250m

 I have 7 executors with 2 cores.

 *What happens:*
 The job produces two stages: keyBy and save. The keyBy stage runs fine
 and produces a shuffle write of ~150mb. The save stage where the suffle
 read occurs hangs. Greater the initial dataset is more tasks hang.

 I did run it for much larger datasets with same config/cluster but
 without doing the union and it worked fine.

 *Some more infos and logs:*

 Amongst 4 nodes 1 finished all his tasks and the running ones are on
 the 3 other nodes. But not sure this is a good information (one node that
 completed all his work vs the others) as with some smaller dataset I manage
 to get only one hanging task.

 Here are the last parts of the executor logs that show some timeouts.

 *An executor from node ip-10-182-98-220*

 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 
 remote fetches in 66 ms
 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in 
 connection from /10.181.48.153:56806
 java.io.IOException: Connection timed out


 *An executor from node ip-10-181-103-186*

 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 
 remote fetches in 20 ms
 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in 
 connection from /10.182.98.220:38784
 java.io.IOException: Connection timed out

 *An executor from node ip-10-181-48-153* (all the logs bellow belong this 
 node)

 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 
 (TID 13860). 802 bytes result sent to driver
 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in 
 connection from /10.181.103.186:46381
 java.io.IOException: Connection timed out

 *Followed by many *

 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending 
 result 
 ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016, 
 chunkIndex=405}, 
 buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data,
  offset=8631, length=571}} to /10.181.103.186:46381; closing connection
 java.nio.channels.ClosedChannelException

 *with last one being*

 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending 
 result RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to 
 /10.181.103.186:46381; closing connection
 java.nio.channels.ClosedChannelException


 The executors from the node that finished his tasks doesn't show
 anything special.

 Note that I don't cache anything thus reduced the storage.memoryFraction
 to 0.
 I see some of those, but don't think they are related.

 15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B (blocks) + 
 0.0 B (scratch space shared across 0 thread(s)) = 0.0 B. Storage limit = 
 0.0 B.


 Sorry for the long email with maybe misleading infos, but I hope it
 might help to track down what happens and why it was working with spark
 1.1.1.

 Thanks,
 Eugen






Re: Hanging tasks in spark 1.2.1 while working with 1.1.1

2015-03-13 Thread Ted Yu
Might be related: what's the value for spark.yarn.executor.memoryOverhead ?

See SPARK-6085

Cheers

On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote:

 Hi,

 I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1. Strange
 thing, the exact same code does work (after upgrade) in the spark-shell.
 But this information might be misleading as it works with 1.1.1...


 *The job takes as input two data sets:*
  - rdd A of +170gb (with less it is hard to reproduce) and more than 11K
 partitions
  - rdd B of +100mb and 32 partitions

 I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am not
 sure the executor config is relevant here. Anyway I tried with multiple
 small executors with fewer ram and the inverse.


 *The job basically does this:*
 A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save

 After the flatMap rdd A size is much smaller similar to B.

 *Configs I used to run this job:*

 storage.memoryFraction: 0
 shuffle.memoryFraction: 0.5

 akka.timeout 500
 akka.frameSize 40

 // this one defines also the memory used by yarn master, but not sure if
 it needs to be important
 driver.memory 5g
 excutor.memory 4250m

 I have 7 executors with 2 cores.

 *What happens:*
 The job produces two stages: keyBy and save. The keyBy stage runs fine and
 produces a shuffle write of ~150mb. The save stage where the suffle read
 occurs hangs. Greater the initial dataset is more tasks hang.

 I did run it for much larger datasets with same config/cluster but without
 doing the union and it worked fine.

 *Some more infos and logs:*

 Amongst 4 nodes 1 finished all his tasks and the running ones are on the
 3 other nodes. But not sure this is a good information (one node that
 completed all his work vs the others) as with some smaller dataset I manage
 to get only one hanging task.

 Here are the last parts of the executor logs that show some timeouts.

 *An executor from node ip-10-182-98-220*

 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote 
 fetches in 66 ms
 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in 
 connection from /10.181.48.153:56806
 java.io.IOException: Connection timed out


 *An executor from node ip-10-181-103-186*

 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote 
 fetches in 20 ms
 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in 
 connection from /10.182.98.220:38784
 java.io.IOException: Connection timed out

 *An executor from node ip-10-181-48-153* (all the logs bellow belong this 
 node)

 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 
 (TID 13860). 802 bytes result sent to driver
 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in 
 connection from /10.181.103.186:46381
 java.io.IOException: Connection timed out

 *Followed by many *

 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result 
 ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016, 
 chunkIndex=405}, 
 buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data,
  offset=8631, length=571}} to /10.181.103.186:46381; closing connection
 java.nio.channels.ClosedChannelException

 *with last one being*

 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result 
 RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to 
 /10.181.103.186:46381; closing connection
 java.nio.channels.ClosedChannelException


 The executors from the node that finished his tasks doesn't show anything
 special.

 Note that I don't cache anything thus reduced the storage.memoryFraction
 to 0.
 I see some of those, but don't think they are related.

 15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B (blocks) + 0.0 
 B (scratch space shared across 0 thread(s)) = 0.0 B. Storage limit = 0.0 B.


 Sorry for the long email with maybe misleading infos, but I hope it might
 help to track down what happens and why it was working with spark 1.1.1.

 Thanks,
 Eugen




Re: Hanging tasks in spark 1.2.1 while working with 1.1.1

2015-03-13 Thread Eugen Cepoi
The one by default 0.07 of executor memory. I'll try increasing it and post
back the result.

Thanks

2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com:

 Might be related: what's the value for spark.yarn.executor.memoryOverhead ?

 See SPARK-6085

 Cheers

 On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com
 wrote:

 Hi,

 I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1.
 Strange thing, the exact same code does work (after upgrade) in the
 spark-shell. But this information might be misleading as it works with
 1.1.1...


 *The job takes as input two data sets:*
  - rdd A of +170gb (with less it is hard to reproduce) and more than 11K
 partitions
  - rdd B of +100mb and 32 partitions

 I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am not
 sure the executor config is relevant here. Anyway I tried with multiple
 small executors with fewer ram and the inverse.


 *The job basically does this:*
 A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save

 After the flatMap rdd A size is much smaller similar to B.

 *Configs I used to run this job:*

 storage.memoryFraction: 0
 shuffle.memoryFraction: 0.5

 akka.timeout 500
 akka.frameSize 40

 // this one defines also the memory used by yarn master, but not sure if
 it needs to be important
 driver.memory 5g
 excutor.memory 4250m

 I have 7 executors with 2 cores.

 *What happens:*
 The job produces two stages: keyBy and save. The keyBy stage runs fine
 and produces a shuffle write of ~150mb. The save stage where the suffle
 read occurs hangs. Greater the initial dataset is more tasks hang.

 I did run it for much larger datasets with same config/cluster but
 without doing the union and it worked fine.

 *Some more infos and logs:*

 Amongst 4 nodes 1 finished all his tasks and the running ones are on
 the 3 other nodes. But not sure this is a good information (one node that
 completed all his work vs the others) as with some smaller dataset I manage
 to get only one hanging task.

 Here are the last parts of the executor logs that show some timeouts.

 *An executor from node ip-10-182-98-220*

 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote 
 fetches in 66 ms
 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in 
 connection from /10.181.48.153:56806
 java.io.IOException: Connection timed out


 *An executor from node ip-10-181-103-186*

 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote 
 fetches in 20 ms
 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in 
 connection from /10.182.98.220:38784
 java.io.IOException: Connection timed out

 *An executor from node ip-10-181-48-153* (all the logs bellow belong this 
 node)

 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 
 (TID 13860). 802 bytes result sent to driver
 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in 
 connection from /10.181.103.186:46381
 java.io.IOException: Connection timed out

 *Followed by many *

 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result 
 ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016, 
 chunkIndex=405}, 
 buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data,
  offset=8631, length=571}} to /10.181.103.186:46381; closing connection
 java.nio.channels.ClosedChannelException

 *with last one being*

 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result 
 RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to 
 /10.181.103.186:46381; closing connection
 java.nio.channels.ClosedChannelException


 The executors from the node that finished his tasks doesn't show anything
 special.

 Note that I don't cache anything thus reduced the storage.memoryFraction
 to 0.
 I see some of those, but don't think they are related.

 15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B (blocks) + 
 0.0 B (scratch space shared across 0 thread(s)) = 0.0 B. Storage limit = 0.0 
 B.


 Sorry for the long email with maybe misleading infos, but I hope it might
 help to track down what happens and why it was working with spark 1.1.1.

 Thanks,
 Eugen





Hanging tasks in spark 1.2.1 while working with 1.1.1

2015-03-13 Thread Eugen Cepoi
Hi,

I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1. Strange
thing, the exact same code does work (after upgrade) in the spark-shell.
But this information might be misleading as it works with 1.1.1...


*The job takes as input two data sets:*
 - rdd A of +170gb (with less it is hard to reproduce) and more than 11K
partitions
 - rdd B of +100mb and 32 partitions

I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am not
sure the executor config is relevant here. Anyway I tried with multiple
small executors with fewer ram and the inverse.


*The job basically does this:*
A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save

After the flatMap rdd A size is much smaller similar to B.

*Configs I used to run this job:*

storage.memoryFraction: 0
shuffle.memoryFraction: 0.5

akka.timeout 500
akka.frameSize 40

// this one defines also the memory used by yarn master, but not sure if it
needs to be important
driver.memory 5g
excutor.memory 4250m

I have 7 executors with 2 cores.

*What happens:*
The job produces two stages: keyBy and save. The keyBy stage runs fine and
produces a shuffle write of ~150mb. The save stage where the suffle read
occurs hangs. Greater the initial dataset is more tasks hang.

I did run it for much larger datasets with same config/cluster but without
doing the union and it worked fine.

*Some more infos and logs:*

Amongst 4 nodes 1 finished all his tasks and the running ones are on the
3 other nodes. But not sure this is a good information (one node that
completed all his work vs the others) as with some smaller dataset I manage
to get only one hanging task.

Here are the last parts of the executor logs that show some timeouts.

*An executor from node ip-10-182-98-220*

15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6
remote fetches in 66 ms
15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in
connection from /10.181.48.153:56806
java.io.IOException: Connection timed out


*An executor from node ip-10-181-103-186*

15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6
remote fetches in 20 ms
15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in
connection from /10.182.98.220:38784
java.io.IOException: Connection timed out

*An executor from node ip-10-181-48-153* (all the logs bellow belong this node)

15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage
1.0 (TID 13860). 802 bytes result sent to driver
15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in
connection from /10.181.103.186:46381
java.io.IOException: Connection timed out

*Followed by many *

15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending
result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016,
chunkIndex=405},
buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data,
offset=8631, length=571}} to /10.181.103.186:46381; closing connection
java.nio.channels.ClosedChannelException

*with last one being*

15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending
result RpcResponse{requestId=7377187355282895939,
response=[B@6fcd0014} to /10.181.103.186:46381; closing connection
java.nio.channels.ClosedChannelException


The executors from the node that finished his tasks doesn't show anything
special.

Note that I don't cache anything thus reduced the storage.memoryFraction to
0.
I see some of those, but don't think they are related.

15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B
(blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 0.0 B.
Storage limit = 0.0 B.


Sorry for the long email with maybe misleading infos, but I hope it might
help to track down what happens and why it was working with spark 1.1.1.

Thanks,
Eugen