Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/21428#discussion_r194957906
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleReader.scala
---
@@ -68,7 +66,7 @@ private[shuffle] class UnsafeRowReceiver(
}
override def receiveAndReply(context: RpcCallContext):
PartialFunction[Any, Unit] = {
- case r: UnsafeRowReceiverMessage =>
+ case r: RPCContinuousShuffleMessage =>
queues(r.writerId).put(r)
--- End diff --
All RPC messages inside Spark are processed in [a shared fixed thread
pool](https://github.com/apache/spark/blob/v2.3.1/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala#L197),
hence we cannot run blocking calls inside a RPC thread.
I think we need to design a backpressure mechanism in future fundamentally
because a receiver cannot block a sender sending data. For example, even if we
block here, we still cannot prevent the sender sending data and they will
finally fulfill the TCP buffer. We cannot just count on TCP backpressure here
as we need to use the same TCP connection in order to support thousands of
machines.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]