Hi, I'm running into consistent failures during a shuffle read while trying to do a group-by followed by a count aggregation (using the DataFrame API on Spark 1.5.2).
The shuffle read (in stage 1) fails with org.apache.spark.shuffle.FetchFailedException: Failed to send RPC 7719188499899260109 to host_a/ip_a:35946: java.nio.channels.ClosedChannelException at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:321) Looking into executor logs shows first shows ERROR TransportChannelHandler: Connection to host_b/ip_b:38804 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong. on the node that threw the FetchFailedException (host_a) and ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=207789700738, chunkIndex=894}, buffer=FileSegmentManagedBuffer{file=/local_disk/spark-ed6667d4-445b-4d65-bfda-e4540b7215aa/executor-d03e5e7e-57d4-40e2-9021-c20d0b84bf75/blockmgr-05d5f2b6-142e-415c-a08b-58d16a10b8bf/27/shuffle_1_13732_0.data, offset=18960736, length=19477}} to /ip_a:32991; closing connection on the node referenced in the exception (host_b). The error in the host_b logs occurred a few seconds after the error in the host_a logs. I noticed there was a lot of spilling going on during the shuffle read, so I attempted to work around this problem by increasing the number of shuffle partitions (to decrease spilling) as well as increasing spark.network.timeout. Neither of these got rid of these connection failures. This causes some of stage 0 to recompute (which runs successfully). Stage 1 retry 1 then always fails with java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) Changing the spark.io.compression.codec to lz4 changes this error to java.io.IOException: Stream is corrupted at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:153) which leads me to believe that the timeout during the shuffle read failure leaves invalid files on disk. Notably, these failures do not occur when I run on smaller subsets of data. The failure is occurring while attempting to group ~100 billion rows into 20 billion groups (with key size of 24 bytes and count as the only aggregation) on a 16 node cluster. I've replicated this failure on 2 completely separate clusters (both running with standalone cluster manager). Does anyone have suggestions about how I could make this crash go away or how I could try to make a smaller failing test case so the bug can be more easily investigated? Best, Eric Martin