Re: RDD collect hangs on large input data

2015-04-17 Thread Zsolt Tóth
Thanks for your answer Imran. I haven't tried your suggestions yet, but
setting spark.shuffle.blockTransferService=nio solved my issue. There is a
JIRA for this: https://issues.apache.org/jira/browse/SPARK-6962.

Zsolt

2015-04-14 21:57 GMT+02:00 Imran Rashid :

> is it possible that when you switch to the bigger data set, your data is
> skewed, and so that some tasks generate far more data?  reduceByKey could
> result in a huge amount of data going to a small number of tasks.  I'd
> suggest
>
> (a) seeing what happens if you don't collect() -- eg. instead try writing
> to hdfs with saveAsObjectFile.
> (b) take a look at what is happening on the executors with the long
> running tasks.  You can get thread dumps via the UI (or you can login into
> the boxes and use jstack).  This might point to some of your code that is
> taking a long time, or it might point to spark internals.
>
> On Wed, Apr 8, 2015 at 3:45 AM, Zsolt Tóth 
> wrote:
>
>> I use EMR 3.3.1 which comes with Java 7. Do you think that this may cause
>> the issue? Did you test it with Java 8?
>>
>
>


Re: RDD collect hangs on large input data

2015-04-08 Thread Zsolt Tóth
I use EMR 3.3.1 which comes with Java 7. Do you think that this may cause
the issue? Did you test it with Java 8?


Re: RDD collect hangs on large input data

2015-04-07 Thread Jon Chase
Zsolt - what version of Java are you running?

On Mon, Mar 30, 2015 at 7:12 AM, Zsolt Tóth 
wrote:

> Thanks for your answer!
> I don't call .collect because I want to trigger the execution. I call it
> because I need the rdd on the driver. This is not a huge RDD and it's not
> larger than the one returned with 50GB input data.
>
> The end of the stack trace:
>
> The two IP's are the two worker nodes, I think they can't connect to the
> driver after they finished their part of the collect().
>
> 15/03/30 10:38:25 INFO executor.Executor: Finished task 872.0 in stage 1.0 
> (TID 1745). 1414 bytes result sent to driver
> 15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(200) called with 
> curMem=405753, maxMem=4883742720
> 15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_867 stored as values 
> in memory (estimated size 200.0 B, free 4.5 GB)
> 15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(80) called with 
> curMem=405953, maxMem=4883742720
> 15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_868 stored as values 
> in memory (estimated size 80.0 B, free 4.5 GB)
> 15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of block 
> rdd_4_867
> 15/03/30 10:38:25 INFO executor.Executor: Finished task 867.0 in stage 1.0 
> (TID 1740). 1440 bytes result sent to driver
> 15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of block 
> rdd_4_868
> 15/03/30 10:38:25 INFO executor.Executor: Finished task 868.0 in stage 1.0 
> (TID 1741). 1422 bytes result sent to driver
> 15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in 
> connection from /10.102.129.251:42026
> java.io.IOException: Connection timed out
>   at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>   at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>   at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>   at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>   at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>   at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
>   at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
>   at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
>   at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>   at java.lang.Thread.run(Thread.java:745)
> 15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in 
> connection from /10.102.129.251:41703
> java.io.IOException: Connection timed out
>   at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>   at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>   at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>   at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>   at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>   at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
>   at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
>   at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
>   at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>   at java.lang.Thread.run(Thread.java:745)
> 15/03/30 10:53:46 WARN server.TransportChannelHandler: Exception in 
> connection from /10.99.144.92:49021
> java.io.IOException: Connection timed out
>   at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>   at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>   at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>   at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>   at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>   at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
>   at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
>   at 
> io.netty.channel.socket.nio.NioSocketC

Re: RDD collect hangs on large input data

2015-03-30 Thread Zsolt Tóth
Thanks for your answer!
I don't call .collect because I want to trigger the execution. I call it
because I need the rdd on the driver. This is not a huge RDD and it's not
larger than the one returned with 50GB input data.

The end of the stack trace:

The two IP's are the two worker nodes, I think they can't connect to the
driver after they finished their part of the collect().

15/03/30 10:38:25 INFO executor.Executor: Finished task 872.0 in stage
1.0 (TID 1745). 1414 bytes result sent to driver
15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(200)
called with curMem=405753, maxMem=4883742720
15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_867 stored as
values in memory (estimated size 200.0 B, free 4.5 GB)
15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(80) called
with curMem=405953, maxMem=4883742720
15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_868 stored as
values in memory (estimated size 80.0 B, free 4.5 GB)
15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of
block rdd_4_867
15/03/30 10:38:25 INFO executor.Executor: Finished task 867.0 in stage
1.0 (TID 1740). 1440 bytes result sent to driver
15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of
block rdd_4_868
15/03/30 10:38:25 INFO executor.Executor: Finished task 868.0 in stage
1.0 (TID 1741). 1422 bytes result sent to driver
15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in
connection from /10.102.129.251:42026
java.io.IOException: Connection timed out
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at 
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at 
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)
15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in
connection from /10.102.129.251:41703
java.io.IOException: Connection timed out
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at 
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at 
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)
15/03/30 10:53:46 WARN server.TransportChannelHandler: Exception in
connection from /10.99.144.92:49021
java.io.IOException: Connection timed out
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at 
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at 
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at 
io.netty.channel.nio.NioEventLoop.processSelected

Re: RDD collect hangs on large input data

2015-03-29 Thread Akhil Das
Don't call .collect if your data size huge, you can simply do a count() to
trigger the execution.

Can you paste your exception stack trace so that we'll know whats happening?

Thanks
Best Regards

On Fri, Mar 27, 2015 at 9:18 PM, Zsolt Tóth 
wrote:

> Hi,
>
> I have a simple Spark application: it creates an input rdd with
> sc.textfile, and it calls flatMapToPair, reduceByKey and map on it. The
> output rdd is small, a few MB's. Then I call collect() on the output.
>
> If the textfile is ~50GB, it finishes in a few minutes. However, if it's
> larger (~100GB) the execution hangs at the end of the collect() stage. The
> UI shows one active job (collect); one completed (flatMapToPair) and one
> active stage (collect). The collect stage has 880/892 tasks succeeded so I
> think the issue should happen when the whole job is finished (every task on
> the UI is either in SUCCESS or in RUNNING state).
> The driver and the containers don't log anything for 15 mins, then I get
> Connection time out.
>
> I run the job in yarn-cluster mode on Amazon EMR with Spark 1.2.1 and
> Hadoop 2.4.0.
>
> This happens every time I run the process with larger input data so I
> think this isn't just a connection issue or something like that. Is this a
> Spark bug or something is wrong with my setup?
>
> Zsolt
>


RDD collect hangs on large input data

2015-03-27 Thread Zsolt Tóth
Hi,

I have a simple Spark application: it creates an input rdd with
sc.textfile, and it calls flatMapToPair, reduceByKey and map on it. The
output rdd is small, a few MB's. Then I call collect() on the output.

If the textfile is ~50GB, it finishes in a few minutes. However, if it's
larger (~100GB) the execution hangs at the end of the collect() stage. The
UI shows one active job (collect); one completed (flatMapToPair) and one
active stage (collect). The collect stage has 880/892 tasks succeeded so I
think the issue should happen when the whole job is finished (every task on
the UI is either in SUCCESS or in RUNNING state).
The driver and the containers don't log anything for 15 mins, then I get
Connection time out.

I run the job in yarn-cluster mode on Amazon EMR with Spark 1.2.1 and
Hadoop 2.4.0.

This happens every time I run the process with larger input data so I think
this isn't just a connection issue or something like that. Is this a Spark
bug or something is wrong with my setup?

Zsolt