Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21385#discussion_r190125731
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
    @@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver(
     
       override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
         case r: UnsafeRowReceiverMessage =>
    -      queue.put(r)
    +      queues(r.writerId).put(r)
           context.reply(())
       }
     
       override def read(): Iterator[UnsafeRow] = {
         new NextIterator[UnsafeRow] {
    -      override def getNext(): UnsafeRow = queue.take() match {
    -        case ReceiverRow(r) => r
    -        case ReceiverEpochMarker() =>
    -          finished = true
    -          null
    +      // An array of flags for whether each writer ID has gotten an epoch 
marker.
    +      private val writerEpochMarkersReceived =
    +        mutable.Map.empty[Int, Boolean].withDefaultValue(false)
    +
    +      private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
    --- End diff --
    
    As I commented earlier in design doc that I was in favor of single queue, 
because I thought it minimizes the thread count which may avoid unnecessary 
contention (as well as code complexity in this case), and also defines the 
condition of backpressure fairly simple (if RPC requests can block infinitely 
unless queue has room to write). 
    
    But as I read some articles regarding `multiple writers, single reader on 
single queue` vs `single writer, single reader on multiple queues per writer`, 
unless we introduce highly-optimized queue like Disruptor, second approach 
looks like perform better. 
    
    So the approach looks great to me for now, and at least we could consider 
replacing this with adopting queue library when we encounter the bad situation.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to