[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r191149214 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -41,11 +50,15 @@ private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessa */ private[shuffle] class UnsafeRowReceiver( queueSize: Int, + numShuffleWriters: Int, + checkpointIntervalMs: Long, override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with ContinuousShuffleReader with Logging { // Note that this queue will be drained from the main task thread and populated in the RPC // response thread. - private val queue = new ArrayBlockingQueue[UnsafeRowReceiverMessage](queueSize) + private val queues = Array.fill(numShuffleWriters) { --- End diff -- Hi TD, just a question here, what's the 'a non-RPC-endpoint-based transfer mechanism' refers for? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21385 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190730661 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -160,25 +170,122 @@ class ContinuousShuffleReadSuite extends StreamTest { } test("blocks waiting for new rows") { -val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val rdd = new ContinuousShuffleReadRDD( + sparkContext, numPartitions = 1, checkpointIntervalMs = Long.MaxValue) +val epoch = rdd.compute(rdd.partitions(0), ctx) val readRowThread = new Thread { override def run(): Unit = { -// set the non-inheritable thread local -TaskContext.setTaskContext(ctx) -val epoch = rdd.compute(rdd.partitions(0), ctx) -epoch.next().getInt(0) +try { + epoch.next().getInt(0) +} catch { + case _: InterruptedException => // do nothing - expected at test ending +} } } try { readRowThread.start() eventually(timeout(streamingTimeout)) { -assert(readRowThread.getState == Thread.State.WAITING) +assert(readRowThread.getState == Thread.State.TIMED_WAITING) --- End diff -- I see. That's good to know! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190684685 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -160,25 +170,122 @@ class ContinuousShuffleReadSuite extends StreamTest { } test("blocks waiting for new rows") { -val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val rdd = new ContinuousShuffleReadRDD( + sparkContext, numPartitions = 1, checkpointIntervalMs = Long.MaxValue) +val epoch = rdd.compute(rdd.partitions(0), ctx) val readRowThread = new Thread { override def run(): Unit = { -// set the non-inheritable thread local -TaskContext.setTaskContext(ctx) -val epoch = rdd.compute(rdd.partitions(0), ctx) -epoch.next().getInt(0) +try { + epoch.next().getInt(0) +} catch { + case _: InterruptedException => // do nothing - expected at test ending +} } } try { readRowThread.start() eventually(timeout(streamingTimeout)) { -assert(readRowThread.getState == Thread.State.WAITING) +assert(readRowThread.getState == Thread.State.TIMED_WAITING) --- End diff -- It changed because we added a timeout. The JVM always puts threads waiting without a timeout into WAITING, and threads waiting with a timeout into TIMED_WAITING. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190403472 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -160,25 +170,122 @@ class ContinuousShuffleReadSuite extends StreamTest { } test("blocks waiting for new rows") { -val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val rdd = new ContinuousShuffleReadRDD( + sparkContext, numPartitions = 1, checkpointIntervalMs = Long.MaxValue) +val epoch = rdd.compute(rdd.partitions(0), ctx) val readRowThread = new Thread { override def run(): Unit = { -// set the non-inheritable thread local -TaskContext.setTaskContext(ctx) -val epoch = rdd.compute(rdd.partitions(0), ctx) -epoch.next().getInt(0) +try { + epoch.next().getInt(0) +} catch { + case _: InterruptedException => // do nothing - expected at test ending +} } } try { readRowThread.start() eventually(timeout(streamingTimeout)) { -assert(readRowThread.getState == Thread.State.WAITING) +assert(readRowThread.getState == Thread.State.TIMED_WAITING) } } finally { readRowThread.interrupt() readRowThread.join() } } + + test("multiple writers") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1, numShuffleWriters = 3) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +send( + endpoint, + ReceiverRow(0, unsafeRow("writer0-row0")), + ReceiverRow(1, unsafeRow("writer1-row0")), + ReceiverRow(2, unsafeRow("writer2-row0")), + ReceiverEpochMarker(0), + ReceiverEpochMarker(1), + ReceiverEpochMarker(2) +) + +val firstEpoch = rdd.compute(rdd.partitions(0), ctx) +assert(firstEpoch.toSeq.map(_.getUTF8String(0).toString).toSet == + Set("writer0-row0", "writer1-row0", "writer2-row0")) + } + + test("epoch only ends when all writers send markers") { +val rdd = new ContinuousShuffleReadRDD( + sparkContext, numPartitions = 1, numShuffleWriters = 3, checkpointIntervalMs = Long.MaxValue) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +send( + endpoint, + ReceiverRow(0, unsafeRow("writer0-row0")), + ReceiverRow(1, unsafeRow("writer1-row0")), + ReceiverRow(2, unsafeRow("writer2-row0")), + ReceiverEpochMarker(0), + ReceiverEpochMarker(2) +) + +val epoch = rdd.compute(rdd.partitions(0), ctx) +val rows = (0 until 3).map(_ => epoch.next()).toSet +assert(rows.map(_.getUTF8String(0).toString) == + Set("writer0-row0", "writer1-row0", "writer2-row0")) + +// After checking the right rows, block until we get an epoch marker indicating there's no next. +// (Also fail the assertion if for some reason we get a row.) +val readEpochMarkerThread = new Thread { + override def run(): Unit = { +assert(!epoch.hasNext) + } +} + +readEpochMarkerThread.start() +eventually(timeout(streamingTimeout)) { + assert(readEpochMarkerThread.getState == Thread.State.TIMED_WAITING) --- End diff -- Same question as above ... is this the only possible thread state. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190403327 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -160,25 +170,122 @@ class ContinuousShuffleReadSuite extends StreamTest { } test("blocks waiting for new rows") { -val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val rdd = new ContinuousShuffleReadRDD( + sparkContext, numPartitions = 1, checkpointIntervalMs = Long.MaxValue) +val epoch = rdd.compute(rdd.partitions(0), ctx) val readRowThread = new Thread { override def run(): Unit = { -// set the non-inheritable thread local -TaskContext.setTaskContext(ctx) -val epoch = rdd.compute(rdd.partitions(0), ctx) -epoch.next().getInt(0) +try { + epoch.next().getInt(0) +} catch { + case _: InterruptedException => // do nothing - expected at test ending +} } } try { readRowThread.start() eventually(timeout(streamingTimeout)) { -assert(readRowThread.getState == Thread.State.WAITING) +assert(readRowThread.getState == Thread.State.TIMED_WAITING) --- End diff -- Why did this change from WAITING to TIMED_WAITING. Can the thread be one or the other state? would this cause flakiness? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190402783 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + // Initialize by submitting tasks to read the first row from each writer. + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + /** + * In each call to getNext(), we pull the next row available in the completion queue, and then + * submit another task to read the next row from the writer which returned it. + * + * When a writer sends an epoch marker, we note that it's finished and don't submit another + * task for it in this epoch. The iterator is over once all writers have sent an epoch marker. + */ + override def getNext(): UnsafeRow = { +var nextRow: UnsafeRow = null +while (nextRow == null) { + nextRow = completion.poll(checkpointIntervalMs, TimeUnit.MILLISECONDS) match { +case null => + // Try again if the poll didn't wait long enough to get a real result. + // But we should be getting at least an epoch marker every checkpoint interval. + logWarning( +s"Completion service failed to make progress after $checkpointIntervalMs ms") + null + +// The completion service guarantees this future will be available immediately. +case future => future.get() match { + case ReceiverRow(writerId, r) => +// Start reading the next element in the queue we just took from. +completion.submit(completionTask(writerId)) +r + case ReceiverEpochMarker(writerId) => +// Don't read any more from this queue. If all the writers have sent epoch markers, +// the epoch is over; otherwise we need to poll from the remaining writers. +writerEpochMarkersReceived(writerId) = true +if (writerEpochMarkersReceived.forall(flag => flag)) { + finished = true + // Break out of the while loop and end the iterator. + return null --- End diff -- Actually ... finished is the flag, just use that for the while loop, that is, `while (!finished && nextRow == null)`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190402584 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + // Initialize by submitting tasks to read the first row from each writer. + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + /** + * In each call to getNext(), we pull the next row available in the completion queue, and then + * submit another task to read the next row from the writer which returned it. + * + * When a writer sends an epoch marker, we note that it's finished and don't submit another + * task for it in this epoch. The iterator is over once all writers have sent an epoch marker. + */ + override def getNext(): UnsafeRow = { +var nextRow: UnsafeRow = null +while (nextRow == null) { + nextRow = completion.poll(checkpointIntervalMs, TimeUnit.MILLISECONDS) match { +case null => + // Try again if the poll didn't wait long enough to get a real result. + // But we should be getting at least an epoch marker every checkpoint interval. + logWarning( +s"Completion service failed to make progress after $checkpointIntervalMs ms") + null + +// The completion service guarantees this future will be available immediately. +case future => future.get() match { + case ReceiverRow(writerId, r) => +// Start reading the next element in the queue we just took from. +completion.submit(completionTask(writerId)) +r + case ReceiverEpochMarker(writerId) => +// Don't read any more from this queue. If all the writers have sent epoch markers, +// the epoch is over; otherwise we need to poll from the remaining writers. +writerEpochMarkersReceived(writerId) = true +if (writerEpochMarkersReceived.forall(flag => flag)) { + finished = true + // Break out of the while loop and end the iterator. + return null +} else { + // Poll again for the next completion result. + null +} --- End diff -- if you put `nextRow = newReceivedRow` inside the case ReceiverRow, then this else clause is not needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190401710 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + // Initialize by submitting tasks to read the first row from each writer. + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + /** + * In each call to getNext(), we pull the next row available in the completion queue, and then + * submit another task to read the next row from the writer which returned it. + * + * When a writer sends an epoch marker, we note that it's finished and don't submit another + * task for it in this epoch. The iterator is over once all writers have sent an epoch marker. + */ + override def getNext(): UnsafeRow = { +var nextRow: UnsafeRow = null +while (nextRow == null) { + nextRow = completion.poll(checkpointIntervalMs, TimeUnit.MILLISECONDS) match { +case null => + // Try again if the poll didn't wait long enough to get a real result. + // But we should be getting at least an epoch marker every checkpoint interval. + logWarning( +s"Completion service failed to make progress after $checkpointIntervalMs ms") + null + +// The completion service guarantees this future will be available immediately. +case future => future.get() match { + case ReceiverRow(writerId, r) => +// Start reading the next element in the queue we just took from. +completion.submit(completionTask(writerId)) +r + case ReceiverEpochMarker(writerId) => +// Don't read any more from this queue. If all the writers have sent epoch markers, +// the epoch is over; otherwise we need to poll from the remaining writers. +writerEpochMarkersReceived(writerId) = true +if (writerEpochMarkersReceived.forall(flag => flag)) { --- End diff -- super nit: `writerEpochMarkersReceived.forall(_ == true)` is easier to understand. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190401100 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + // Initialize by submitting tasks to read the first row from each writer. + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + /** + * In each call to getNext(), we pull the next row available in the completion queue, and then + * submit another task to read the next row from the writer which returned it. + * + * When a writer sends an epoch marker, we note that it's finished and don't submit another + * task for it in this epoch. The iterator is over once all writers have sent an epoch marker. + */ + override def getNext(): UnsafeRow = { +var nextRow: UnsafeRow = null +while (nextRow == null) { + nextRow = completion.poll(checkpointIntervalMs, TimeUnit.MILLISECONDS) match { +case null => + // Try again if the poll didn't wait long enough to get a real result. + // But we should be getting at least an epoch marker every checkpoint interval. + logWarning( +s"Completion service failed to make progress after $checkpointIntervalMs ms") + null + +// The completion service guarantees this future will be available immediately. +case future => future.get() match { + case ReceiverRow(writerId, r) => +// Start reading the next element in the queue we just took from. +completion.submit(completionTask(writerId)) +r + case ReceiverEpochMarker(writerId) => +// Don't read any more from this queue. If all the writers have sent epoch markers, +// the epoch is over; otherwise we need to poll from the remaining writers. +writerEpochMarkersReceived(writerId) = true +if (writerEpochMarkersReceived.forall(flag => flag)) { + finished = true + // Break out of the while loop and end the iterator. + return null --- End diff -- super nit: I personally find these sort of escape hard to read in code. Consider making a separate flag to end the while loop. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190400926 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + // Initialize by submitting tasks to read the first row from each writer. + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + /** + * In each call to getNext(), we pull the next row available in the completion queue, and then + * submit another task to read the next row from the writer which returned it. + * + * When a writer sends an epoch marker, we note that it's finished and don't submit another + * task for it in this epoch. The iterator is over once all writers have sent an epoch marker. + */ + override def getNext(): UnsafeRow = { +var nextRow: UnsafeRow = null +while (nextRow == null) { + nextRow = completion.poll(checkpointIntervalMs, TimeUnit.MILLISECONDS) match { --- End diff -- move this inside... its hard to follow the pattern `x = // some large code structured` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190398051 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + // Initialize by submitting tasks to read the first row from each writer. + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + /** + * In each call to getNext(), we pull the next row available in the completion queue, and then + * submit another task to read the next row from the writer which returned it. + * + * When a writer sends an epoch marker, we note that it's finished and don't submit another + * task for it in this epoch. The iterator is over once all writers have sent an epoch marker. + */ + override def getNext(): UnsafeRow = { +var nextRow: UnsafeRow = null +while (nextRow == null) { + nextRow = completion.poll(checkpointIntervalMs, TimeUnit.MILLISECONDS) match { +case null => + // Try again if the poll didn't wait long enough to get a real result. + // But we should be getting at least an epoch marker every checkpoint interval. + logWarning( +s"Completion service failed to make progress after $checkpointIntervalMs ms") --- End diff -- Maybe also print which writers (and how many) are we waiting on, so that we can debug. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190396813 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -42,16 +47,24 @@ case class ContinuousShuffleReadPartition(index: Int, queueSize: Int) extends Pa * RDD at the map side of each continuous processing shuffle task. Upstream tasks send their * shuffle output to the wrapped receivers in partitions of this RDD; each of the RDD's tasks * poll from their receiver until an epoch marker is sent. + * + * @param sc the RDD context + * @param numPartitions the number of read partitions for this RDD + * @param queueSize the size of the row buffers to use + * @param numShuffleWriters the number of continuous shuffle writers feeding into this RDD + * @param checkpointIntervalMs the checkpoint interval of the streaming query */ class ContinuousShuffleReadRDD( sc: SparkContext, numPartitions: Int, -queueSize: Int = 1024) +queueSize: Int = 1024, +numShuffleWriters: Int = 1, +checkpointIntervalMs: Long = 1000) --- End diff -- Isnt this the same as epochInterval? the term "epoch" is more well known in the code than "checkpoint" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190354487 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -41,11 +50,15 @@ private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessa */ private[shuffle] class UnsafeRowReceiver( queueSize: Int, + numShuffleWriters: Int, + checkpointIntervalMs: Long, override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with ContinuousShuffleReader with Logging { // Note that this queue will be drained from the main task thread and populated in the RPC // response thread. - private val queue = new ArrayBlockingQueue[UnsafeRowReceiverMessage](queueSize) + private val queues = Array.fill(numShuffleWriters) { --- End diff -- I agree. This level of issues can be dealt with later once we run this and observed the issues for real. Also, it's likely that we will eventually replace a whole lot of this with a non-RPC-endpoint-based transfer mechanism. So lets keep something very simple for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190331721 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) --- End diff -- Ah, yes the n-queues makes it difficult. It would have been straight forward if there was only one queue. I guess here you would have to round-robin and poll the queues with very small timeouts so that you don't block on any queue. If this is a prototype either approach is fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190328335 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) --- End diff -- I tried to make that work, but I couldn't figure out how to determine from the task thread which queue is the right one. We can't round-robin, or else we'll fail to make progress if some writers send more data than others. As mentioned in the doc, this iteration of the shuffle functionality isn't meant to be particularly scalable - we're already incurring a bunch of overhead from using the RPC framework to send data around. The focus for now is correctness and debuggability. Once we have everything in place, we'll swap out everything between ContinuousShuffleReader and ContinuousShuffleWriter with some sort of TCP socket approach, which will probably be able to do away with the N queues entirely by leveraging TCP backpressure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190327022 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -41,11 +50,15 @@ private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessa */ private[shuffle] class UnsafeRowReceiver( queueSize: Int, + numShuffleWriters: Int, + checkpointIntervalMs: Long, override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with ContinuousShuffleReader with Logging { // Note that this queue will be drained from the main task thread and populated in the RPC // response thread. - private val queue = new ArrayBlockingQueue[UnsafeRowReceiverMessage](queueSize) + private val queues = Array.fill(numShuffleWriters) { --- End diff -- Then it would be configuration nightmare..It should be ok to start with the proposed approach and use same size for all the queues. We can change it later if we see issues as long as the internal details are not exposed to the user. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190324822 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -41,11 +50,15 @@ private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessa */ private[shuffle] class UnsafeRowReceiver( queueSize: Int, + numShuffleWriters: Int, + checkpointIntervalMs: Long, override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with ContinuousShuffleReader with Logging { // Note that this queue will be drained from the main task thread and populated in the RPC // response thread. - private val queue = new ArrayBlockingQueue[UnsafeRowReceiverMessage](queueSize) + private val queues = Array.fill(numShuffleWriters) { --- End diff -- Well, is it a problem if we don't set a single size across all the queues? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190320670 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) --- End diff -- Are we creating threads equal to num writers here and do we need it? I assume there would be already the RDD task thread per partition that's invoking `getNext` and cant we return the row from the right queue(s) there ? My concern is that these threads are going to bloat up the total threads to m*n (rdd partitions * shuffle writers) and this may not be very scalable considering the amount of context switch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190316408 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -41,11 +50,15 @@ private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessa */ private[shuffle] class UnsafeRowReceiver( queueSize: Int, + numShuffleWriters: Int, + checkpointIntervalMs: Long, override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with ContinuousShuffleReader with Logging { // Note that this queue will be drained from the main task thread and populated in the RPC // response thread. - private val queue = new ArrayBlockingQueue[UnsafeRowReceiverMessage](queueSize) + private val queues = Array.fill(numShuffleWriters) { --- End diff -- Queue per writer makes it easier to handle the alignment. However, if the queues do not get filled evenly, it makes it much more difficult to set a single size across all the queues. In a single queue this would not be much of an issue since the queues are shared by all writers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190305870 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = --- End diff -- Good point. I guess that was what TD was suggesting too now that I think about it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190131693 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = +mutable.Map.empty[Int, Boolean].withDefaultValue(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) --- End diff -- And I'm also now seeing this approach as alternative to deal with alignment (not buffer rows explicitly but just don't read after epoch comes in). Nice approach. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190129892 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = +mutable.Map.empty[Int, Boolean].withDefaultValue(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + // Initialize by submitting tasks to read the first row from each writer. + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + /** + * In each call to getNext(), we pull the next row available in the completion queue, and then + * submit another task to read the next row from the writer which returned it. + * + * When a writer sends an epoch marker, we note that it's finished and don't submit another + * task for it in this epoch. The iterator is over once all writers have sent an epoch marker. + */ + override def getNext(): UnsafeRow = { +var nextRow: UnsafeRow = null +while (nextRow == null) { + nextRow = completion.poll(checkpointIntervalMs, TimeUnit.MILLISECONDS) match { +case null => + // Try again if the poll didn't wait long enough to get a real result. + // But we should be getting at least an epoch marker every checkpoint interval. + logWarning( +s"Completion service failed to make progress after $checkpointIntervalMs ms") + null + +// The completion service guarantees this future will be available immediately. +case future => future.get() match { + case ReceiverRow(writerId, r) => +// Start reading the next element in the queue we just took from. +completion.submit(completionTask(writerId)) +r + // TODO use writerId --- End diff -- It looks like to be not needed as of now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190125731 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = +mutable.Map.empty[Int, Boolean].withDefaultValue(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) --- End diff -- As I commented earlier in design doc that I was in favor of single queue, because I thought it minimizes the thread count which may avoid unnecessary contention (as well as code complexity in this case), and also defines the condition of backpressure fairly simple (if RPC requests can block infinitely unless queue has room to write). But as I read some articles regarding `multiple writers, single reader on single queue` vs `single writer, single reader on multiple queues per writer`, unless we introduce highly-optimized queue like Disruptor, second approach looks like perform better. So the approach looks great to me for now, and at least we could consider replacing this with adopting queue library when we encounter the bad situation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190120836 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = --- End diff -- The map will always contain `(writerId, true)` which value is not needed at all, and we are only concerned about the writerId which range is 0 until numShuffleWriters, so it might be better to consider alternative as well. Looks like this could be also a Set with pre-initialized to 0 until numShuffleWriters, and we can remove the element when we receive mark. If the element is still in a set, this represents we didn't receive mark from such writer yet. In similar approach, it can be pre-initialized Array of Boolean with value as true/false. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190088531 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + private val numWriterEpochMarkers = new AtomicInteger(0) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + override def getNext(): UnsafeRow = { +completion.take().get() match { + case ReceiverRow(writerId, r) => +// Start reading the next element in the queue we just took from. +completion.submit(completionTask(writerId)) +r +// TODO use writerId + case ReceiverEpochMarker(writerId) => +// Don't read any more from this queue. If all the writers have sent epoch markers, +// the epoch is over; otherwise we need rows from one of the remaining writers. +val writersCompleted = numWriterEpochMarkers.incrementAndGet() --- End diff -- A map ended up being a more natural way to express the logic I think. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190088467 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + private val numWriterEpochMarkers = new AtomicInteger(0) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + override def getNext(): UnsafeRow = { +completion.take().get() match { + case ReceiverRow(writerId, r) => +// Start reading the next element in the queue we just took from. +completion.submit(completionTask(writerId)) +r +// TODO use writerId --- End diff -- It was for the verification you mentioned below. I left it as a marker for myself and then forgot to go back to it :( --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190082630 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -161,13 +189,15 @@ class ContinuousShuffleReadSuite extends StreamTest { test("blocks waiting for new rows") { val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val epoch = rdd.compute(rdd.partitions(0), ctx) val readRowThread = new Thread { override def run(): Unit = { -// set the non-inheritable thread local -TaskContext.setTaskContext(ctx) -val epoch = rdd.compute(rdd.partitions(0), ctx) -epoch.next().getInt(0) +try { --- End diff -- okay. that make sense. I was worried it was for some nasty error condition that i missed in the previous PR review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190080895 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -161,13 +189,15 @@ class ContinuousShuffleReadSuite extends StreamTest { test("blocks waiting for new rows") { val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val epoch = rdd.compute(rdd.partitions(0), ctx) val readRowThread = new Thread { override def run(): Unit = { -// set the non-inheritable thread local -TaskContext.setTaskContext(ctx) -val epoch = rdd.compute(rdd.partitions(0), ctx) -epoch.next().getInt(0) +try { --- End diff -- I realized when writing a test further down that it was cleaner to get the RDD outside the thread, and that not catching the InterruptedException was spamming my console. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190061592 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + private val numWriterEpochMarkers = new AtomicInteger(0) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + override def getNext(): UnsafeRow = { +completion.take().get() match { --- End diff -- `take().get()` can block indefinitely if there happens to be issues like marker is never received from writer. Instead I suggest a small change. Put this in a loop (also suggested below) and use `poll(timeout)`. Every time it timesout, print the status of what / how many writers is this waiting for. This is be super help for debugging such issues, without adding much complexity. The timeout could be the epoch time interval, because expect markers to be received within that period. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190069654 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -181,4 +211,80 @@ class ContinuousShuffleReadSuite extends StreamTest { readRowThread.join() } } + + test("epoch only ends when all writers send markers") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1, numShuffleWriters = 3) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +send( + endpoint, + ReceiverRow(0, unsafeRow("writer0-row0")), + ReceiverRow(1, unsafeRow("writer1-row0")), + ReceiverRow(2, unsafeRow("writer2-row0")), + ReceiverEpochMarker(0), + ReceiverEpochMarker(2) +) + +val epoch = rdd.compute(rdd.partitions(0), ctx) +val rows = (0 until 3).map(_ => epoch.next()).toSet +assert(rows.map(_.getUTF8String(0).toString) == + Set("writer0-row0", "writer1-row0", "writer2-row0")) + +// After checking the right rows, block until we get an epoch marker indicating there's no next. +// (Also fail the assertion if for some reason we get a row.) +val readEpochMarkerThread = new Thread { + override def run(): Unit = { +assert(!epoch.hasNext) + } +} + +readEpochMarkerThread.start() + --- End diff -- nit: remove empty line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190059433 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + private val numWriterEpochMarkers = new AtomicInteger(0) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + override def getNext(): UnsafeRow = { +completion.take().get() match { + case ReceiverRow(writerId, r) => +// Start reading the next element in the queue we just took from. +completion.submit(completionTask(writerId)) +r +// TODO use writerId --- End diff -- what is this todo for? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190069533 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -115,17 +124,36 @@ class ContinuousShuffleReadSuite extends StreamTest { assert(secondEpoch.toSeq.map(_.getInt(0)) == Seq(222, 333)) } + test("multiple writers") { --- End diff -- nit: move this below with all the other multi writer tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190056646 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + private val numWriterEpochMarkers = new AtomicInteger(0) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) --- End diff -- Would be nice to add a bit of docs on the control flow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190069145 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -161,13 +189,15 @@ class ContinuousShuffleReadSuite extends StreamTest { test("blocks waiting for new rows") { val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val epoch = rdd.compute(rdd.partitions(0), ctx) val readRowThread = new Thread { override def run(): Unit = { -// set the non-inheritable thread local -TaskContext.setTaskContext(ctx) -val epoch = rdd.compute(rdd.partitions(0), ctx) -epoch.next().getInt(0) +try { --- End diff -- why this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190060224 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + private val numWriterEpochMarkers = new AtomicInteger(0) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + override def getNext(): UnsafeRow = { +completion.take().get() match { + case ReceiverRow(writerId, r) => +// Start reading the next element in the queue we just took from. +completion.submit(completionTask(writerId)) +r +// TODO use writerId + case ReceiverEpochMarker(writerId) => +// Don't read any more from this queue. If all the writers have sent epoch markers, +// the epoch is over; otherwise we need rows from one of the remaining writers. +val writersCompleted = numWriterEpochMarkers.incrementAndGet() --- End diff -- I think you should also verify that we dont accidentally receiver two markers from the same id. So instead of count, we should track an array of what left to be committed. That may also be useful for debugging issues like the queue never receiving a marker from a writer for some reason. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190058882 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -41,11 +44,14 @@ private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessa */ --- End diff -- update docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190057347 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + private val numWriterEpochMarkers = new AtomicInteger(0) --- End diff -- does this need a atomicinteger? seems like this is only incremented and accessed from the main thread draining the iterator below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190057920 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + private val numWriterEpochMarkers = new AtomicInteger(0) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + override def getNext(): UnsafeRow = { +completion.take().get() match { + case ReceiverRow(writerId, r) => +// Start reading the next element in the queue we just took from. +completion.submit(completionTask(writerId)) +r +// TODO use writerId + case ReceiverEpochMarker(writerId) => +// Don't read any more from this queue. If all the writers have sent epoch markers, +// the epoch is over; otherwise we need rows from one of the remaining writers. +val writersCompleted = numWriterEpochMarkers.incrementAndGet() +if (writersCompleted == numShuffleWriters) { + finished = true + null +} else { + getNext() --- End diff -- with a large number of shuffle writers, there is a very slight chance that this is can lead to a stackoverflow; when you accidentally get a sequence of N markers from N writers and N is large. make this a while loop. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
GitHub user jose-torres opened a pull request: https://github.com/apache/spark/pull/21385 [SPARK-24234][SS] Support multiple row writers in continuous processing shuffle reader. ## What changes were proposed in this pull request? https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii Support multiple different row writers in continuous processing shuffle reader. Note that having multiple read-side buffers ended up being the natural way to do this. Otherwise it's hard to express the constraint of sending an epoch marker only when all writers have sent one. ## How was this patch tested? new unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/jose-torres/spark multipleWrite Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21385.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21385 commit 1d6b71898e2a640e3c0809695d2b83f3f84eaa38 Author: Jose TorresDate: 2018-05-15T18:07:54Z continuous shuffle read RDD commit b5d100875932bdfcb645c8f6b2cdb7b815d84c80 Author: Jose Torres Date: 2018-05-17T03:11:11Z docs commit af407694a5f13c18568da4a63848f82374a44377 Author: Jose Torres Date: 2018-05-17T03:19:37Z Merge remote-tracking branch 'apache/master' into readerRddMaster commit 46456dc75a6aec9659b18523c421999debd060eb Author: Jose Torres Date: 2018-05-17T03:22:49Z fix ctor commit 2ea8a6f94216e8b184e5780ec3e6ffb2838de382 Author: Jose Torres Date: 2018-05-17T03:43:10Z multiple partition test commit 955ac79eb05dc389e632d1aaa6c59396835c6ed5 Author: Jose Torres Date: 2018-05-17T13:33:51Z unset task context after test commit 8cefb724512b51f2aa1fdd81fa8a2d4560e60ce3 Author: Jose Torres Date: 2018-05-18T00:00:05Z conf from RDD commit f91bfe7e3fc174202d7d5c7cde5a8fb7ce86bfd3 Author: Jose Torres Date: 2018-05-18T00:00:44Z endpoint name commit 259029298fc42a65e8ebb4d2effe49b7fafa96f1 Author: Jose Torres Date: 2018-05-18T00:02:08Z testing bool commit 859e6e4dd4dd90ffd70fc9cbd243c94090d72506 Author: Jose Torres Date: 2018-05-18T00:22:10Z tests commit b23b7bb17abe3cbc873a3144c56d08c88bc0c963 Author: Jose Torres Date: 2018-05-18T00:40:55Z take instead of poll commit 97f7e8ff865e6054d0d70914ce9bb51880b161f6 Author: Jose Torres Date: 2018-05-18T00:58:44Z add interface commit de21b1c25a333d44c0521fe151b468e51f0bdc47 Author: Jose Torres Date: 2018-05-18T01:02:37Z clarify comment commit 7dcf51a13e92a0bb2998e2a12e67d351e1c1a4fc Author: Jose Torres Date: 2018-05-18T22:39:28Z multiple commit 154843d799683c5cdfc035033475f223f85f0d66 Author: Jose Torres Date: 2018-05-18T22:41:18Z don't use spark conf for the sql conf commit f0262d0a9d3539bcf8fbdbb248968fd704d1e690 Author: Jose Torres Date: 2018-05-18T22:54:27Z end thread commit 3e7a6f9d31967d9efc618c4d319a9dabd22ae4e5 Author: Jose Torres Date: 2018-05-18T22:54:54Z name thread commit 0a38ced23b7e1a6dfe9588ef0ebf7c071a08055d Author: Jose Torres Date: 2018-05-18T22:55:14Z no toString commit ef34e6e9817274df9378341bfb52105c591a5507 Author: Jose Torres Date: 2018-05-18T23:00:01Z send method commit 00f910ea39b76a24e1e21acdf3d6a20fd7784fa9 Author: Jose Torres Date: 2018-05-18T23:02:10Z fix commit aa463ddfd6dcc860f3a9119afe5db32db06945d8 Author: Jose Torres Date: 2018-05-21T16:27:23Z Merge branch 'readerRddMaster' into multipleWrite commit 504bf7426acf16cce21c549e494b8149dbaa3774 Author: Jose Torres Date: 2018-05-21T17:13:33Z add test commit 79c3158fc4bb70eb145503699eb6007a029e0c6c Author: Jose Torres Date: 2018-05-21T20:10:06Z Merge remote-tracking branch 'apache/master' into multipleWrite --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org