Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21385#discussion_r190327022
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
    @@ -41,11 +50,15 @@ private[shuffle] case class ReceiverEpochMarker() 
extends UnsafeRowReceiverMessa
      */
     private[shuffle] class UnsafeRowReceiver(
           queueSize: Int,
    +      numShuffleWriters: Int,
    +      checkpointIntervalMs: Long,
           override val rpcEnv: RpcEnv)
         extends ThreadSafeRpcEndpoint with ContinuousShuffleReader with 
Logging {
       // Note that this queue will be drained from the main task thread and 
populated in the RPC
       // response thread.
    -  private val queue = new 
ArrayBlockingQueue[UnsafeRowReceiverMessage](queueSize)
    +  private val queues = Array.fill(numShuffleWriters) {
    --- End diff --
    
    Then it would be configuration nightmare..It should be ok to start with the 
proposed approach and use same size for all the queues. We can change it later 
if we see issues as long as the internal details are not exposed to the user.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to