Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21385#discussion_r190328335
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
    @@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver(
     
       override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
         case r: UnsafeRowReceiverMessage =>
    -      queue.put(r)
    +      queues(r.writerId).put(r)
           context.reply(())
       }
     
       override def read(): Iterator[UnsafeRow] = {
         new NextIterator[UnsafeRow] {
    -      override def getNext(): UnsafeRow = queue.take() match {
    -        case ReceiverRow(r) => r
    -        case ReceiverEpochMarker() =>
    -          finished = true
    -          null
    +      // An array of flags for whether each writer ID has gotten an epoch 
marker.
    +      private val writerEpochMarkersReceived = 
Array.fill(numShuffleWriters)(false)
    +
    +      private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
    --- End diff --
    
    I tried to make that work, but I couldn't figure out how to determine from 
the task thread which queue is the right one. We can't round-robin, or else 
we'll fail to make progress if some writers send more data than others.
    
    As mentioned in the doc, this iteration of the shuffle functionality isn't 
meant to be particularly scalable - we're already incurring a bunch of overhead 
from using the RPC framework to send data around. The focus for now is 
correctness and debuggability. Once we have everything in place, we'll swap out 
everything between ContinuousShuffleReader and ContinuousShuffleWriter with 
some sort of TCP socket approach, which will probably be able to do away with 
the N queues entirely by leveraging TCP backpressure.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to