Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190125731 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { - case ReceiverRow(r) => r - case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = + mutable.Map.empty[Int, Boolean].withDefaultValue(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) --- End diff -- As I commented earlier in design doc that I was in favor of single queue, because I thought it minimizes the thread count which may avoid unnecessary contention (as well as code complexity in this case), and also defines the condition of backpressure fairly simple (if RPC requests can block infinitely unless queue has room to write). But as I read some articles regarding `multiple writers, single reader on single queue` vs `single writer, single reader on multiple queues per writer`, unless we introduce highly-optimized queue like Disruptor, second approach looks like perform better. So the approach looks great to me for now, and at least we could consider replacing this with adopting queue library when we encounter the bad situation.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org