Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/21385#discussion_r190328335
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
---
@@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver(
override def receiveAndReply(context: RpcCallContext):
PartialFunction[Any, Unit] = {
case r: UnsafeRowReceiverMessage =>
- queue.put(r)
+ queues(r.writerId).put(r)
context.reply(())
}
override def read(): Iterator[UnsafeRow] = {
new NextIterator[UnsafeRow] {
- override def getNext(): UnsafeRow = queue.take() match {
- case ReceiverRow(r) => r
- case ReceiverEpochMarker() =>
- finished = true
- null
+ // An array of flags for whether each writer ID has gotten an epoch
marker.
+ private val writerEpochMarkersReceived =
Array.fill(numShuffleWriters)(false)
+
+ private val executor =
Executors.newFixedThreadPool(numShuffleWriters)
--- End diff --
I tried to make that work, but I couldn't figure out how to determine from
the task thread which queue is the right one. We can't round-robin, or else
we'll fail to make progress if some writers send more data than others.
As mentioned in the doc, this iteration of the shuffle functionality isn't
meant to be particularly scalable - we're already incurring a bunch of overhead
from using the RPC framework to send data around. The focus for now is
correctness and debuggability. Once we have everything in place, we'll swap out
everything between ContinuousShuffleReader and ContinuousShuffleWriter with
some sort of TCP socket approach, which will probably be able to do away with
the N queues entirely by leveraging TCP backpressure.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]