[jira] [Updated] (SPARK-44772) Reading blocks from remote executors causes timeout issue

2023-08-13 Thread nebi mert aydin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

nebi mert aydin updated SPARK-44772:

Priority: Blocker  (was: Major)

> Reading blocks from remote executors  causes timeout issue
> --
>
> Key: SPARK-44772
> URL: https://issues.apache.org/jira/browse/SPARK-44772
> Project: Spark
>  Issue Type: Bug
>  Components: EC2, PySpark, Shuffle, Spark Core
>Affects Versions: 3.1.2
>Reporter: nebi mert aydin
>Priority: Blocker
>
> I'm using EMR 6.5 with Spark 3.1.2
> I'm shuffling 1.5 TiB of data with 3000 executors with 4 cores 23 gig memory 
> for executors
> Also speculative mode is on.
> {code:java}
> // df.repartition(6000) {code}
> I see lots of failures with 
> {code:java}
> 2023-08-11 01:01:09,846 ERROR 
> org.apache.spark.network.server.ChunkFetchRequestHandler 
> (shuffle-server-4-95): Error sending result 
> ChunkFetchSuccess[streamChunkId=StreamChunkId[streamId=779084003612,chunkIndex=323],buffer=FileSegmentManagedBuffer[file=/mnt3/yarn/usercache/zeppelin/appcache/application_1691438567823_0012/blockmgr-0d82ca05-9429-4ff2-9f61-e779e8e60648/07/shuffle_5_114492_0.data,offset=1836997,length=618]]
>  to /172.31.20.110:36654; closing connection
> java.nio.channels.ClosedChannelException
>   at 
> org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>   at 
> org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
>   at 
> org.sparkproject.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:110)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
>   at 
> org.sparkproject.io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808)
>   at 
> org.sparkproject.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:294)
>   at 
> org.apache.spark.network.server.ChunkFetchRequestHandler.respond(ChunkFetchRequestHandler.java:142)
>   at 
> org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:116)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
>   at 
> org.sparkproject.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>   at 
> 

[jira] [Updated] (SPARK-44772) Reading blocks from remote executors causes timeout issue

2023-08-10 Thread nebi mert aydin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

nebi mert aydin updated SPARK-44772:

Component/s: Shuffle
 Spark Core

> Reading blocks from remote executors  causes timeout issue
> --
>
> Key: SPARK-44772
> URL: https://issues.apache.org/jira/browse/SPARK-44772
> Project: Spark
>  Issue Type: Bug
>  Components: EC2, PySpark, Shuffle, Spark Core
>Affects Versions: 3.1.2
>Reporter: nebi mert aydin
>Priority: Major
>
> I'm using EMR 6.5 with Spark 3.1.2
> I'm shuffling 1.5 TiB of data with 3000 executors with 4 cores 23 gig memory 
> for executors
> Also speculative mode is on.
> {code:java}
> // df.repartition(6000) {code}
> I see lots of failures with 
> {code:java}
> 2023-08-11 01:01:09,846 ERROR 
> org.apache.spark.network.server.ChunkFetchRequestHandler 
> (shuffle-server-4-95): Error sending result 
> ChunkFetchSuccess[streamChunkId=StreamChunkId[streamId=779084003612,chunkIndex=323],buffer=FileSegmentManagedBuffer[file=/mnt3/yarn/usercache/zeppelin/appcache/application_1691438567823_0012/blockmgr-0d82ca05-9429-4ff2-9f61-e779e8e60648/07/shuffle_5_114492_0.data,offset=1836997,length=618]]
>  to /172.31.20.110:36654; closing connection
> java.nio.channels.ClosedChannelException
>   at 
> org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>   at 
> org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
>   at 
> org.sparkproject.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:110)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
>   at 
> org.sparkproject.io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808)
>   at 
> org.sparkproject.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:294)
>   at 
> org.apache.spark.network.server.ChunkFetchRequestHandler.respond(ChunkFetchRequestHandler.java:142)
>   at 
> org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:116)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
>   at 
> org.sparkproject.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>   at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>   at 
> 

[jira] [Updated] (SPARK-44772) Reading blocks from remote executors causes timeout issue

2023-08-10 Thread nebi mert aydin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

nebi mert aydin updated SPARK-44772:

Description: 
I'm using EMR 6.5 with Spark 3.1.2

I'm shuffling 1.5 TiB of data with 3000 executors with 4 cores 23 gig memory 
for executors

Also speculative mode is on.
{code:java}
// df.repartition(6000) {code}
I see lots of failures with 
{code:java}
2023-08-11 01:01:09,846 ERROR 
org.apache.spark.network.server.ChunkFetchRequestHandler (shuffle-server-4-95): 
Error sending result 
ChunkFetchSuccess[streamChunkId=StreamChunkId[streamId=779084003612,chunkIndex=323],buffer=FileSegmentManagedBuffer[file=/mnt3/yarn/usercache/zeppelin/appcache/application_1691438567823_0012/blockmgr-0d82ca05-9429-4ff2-9f61-e779e8e60648/07/shuffle_5_114492_0.data,offset=1836997,length=618]]
 to /172.31.20.110:36654; closing connection
java.nio.channels.ClosedChannelException
at 
org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
at 
org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
at 
org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
at 
org.sparkproject.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:110)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
at 
org.sparkproject.io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808)
at 
org.sparkproject.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025)
at 
org.sparkproject.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:294)
at 
org.apache.spark.network.server.ChunkFetchRequestHandler.respond(ChunkFetchRequestHandler.java:142)
at 
org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:116)
at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
at 
org.sparkproject.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at 
org.sparkproject.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at 

[jira] [Updated] (SPARK-44772) Reading blocks from remote executors causes timeout issue

2023-08-10 Thread nebi mert aydin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

nebi mert aydin updated SPARK-44772:

Description: 
I'm using EMR 6.5 with Spark 3.1.2

I'm shuffling 1.5 TiB of data with 3000 executors with 4 cores 23 gig memory 
for executors
{code:java}
// df.repartition(6000) {code}
I see lots of failures with 
{code:java}
2023-08-11 01:01:09,846 ERROR 
org.apache.spark.network.server.ChunkFetchRequestHandler (shuffle-server-4-95): 
Error sending result 
ChunkFetchSuccess[streamChunkId=StreamChunkId[streamId=779084003612,chunkIndex=323],buffer=FileSegmentManagedBuffer[file=/mnt3/yarn/usercache/zeppelin/appcache/application_1691438567823_0012/blockmgr-0d82ca05-9429-4ff2-9f61-e779e8e60648/07/shuffle_5_114492_0.data,offset=1836997,length=618]]
 to /172.31.20.110:36654; closing connection
java.nio.channels.ClosedChannelException
at 
org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
at 
org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
at 
org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
at 
org.sparkproject.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:110)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
at 
org.sparkproject.io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808)
at 
org.sparkproject.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025)
at 
org.sparkproject.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:294)
at 
org.apache.spark.network.server.ChunkFetchRequestHandler.respond(ChunkFetchRequestHandler.java:142)
at 
org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:116)
at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
at 
org.sparkproject.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at 
org.sparkproject.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at 

[jira] [Created] (SPARK-44772) Reading blocks from remote executors causes timeout issue

2023-08-10 Thread nebi mert aydin (Jira)
nebi mert aydin created SPARK-44772:
---

 Summary: Reading blocks from remote executors  causes timeout issue
 Key: SPARK-44772
 URL: https://issues.apache.org/jira/browse/SPARK-44772
 Project: Spark
  Issue Type: Bug
  Components: EC2, PySpark
Affects Versions: 3.1.2
Reporter: nebi mert aydin


I'm using EMR 6.5 with Spark 3.1.2

I'm shuffling 1.5 TiB of data with 3000 executors with 4 cores 23 gig memory 
for executor
`df.repartition(6000)`
I see lots of failures with 

```

2023-08-11 01:01:09,847 ERROR 
org.apache.spark.network.server.ChunkFetchRequestHandler (shuffle-server-4-95): 
Error sending result 
ChunkFetchSuccess[streamChunkId=StreamChunkId[streamId=779084003612,chunkIndex=324],buffer=FileSegmentManagedBuffer[file=/mnt1/yarn/usercache/zeppelin/appcache/application_1691438567823_0012/blockmgr-b2f9bea5-068c-45c8-b324-1f132c87de54/24/shuffle_5_115515_0.data,offset=680394,length=255]]
 to /172.31.20.110:36654; closing connection

```

I tried to set this for kernel

```

sudo ethtool -K eth0 tso off
sudo ethtool -K eth0 sg off

```

Didn't work. I guess external shuffle service is not able to send to data to 
other executors due to some reason.

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2023-08-10 Thread nebi mert aydin (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753038#comment-17753038
 ] 

nebi mert aydin commented on SPARK-24578:
-

I still have this problem in Amazon EMR spark 3.1.2, any idea?

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Assignee: Wenbo Zhao
>Priority: Blocker
> Fix For: 2.3.2, 2.4.0
>
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())