[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-28 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r191149214
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -41,11 +50,15 @@ private[shuffle] case class ReceiverEpochMarker() 
extends UnsafeRowReceiverMessa
  */
 private[shuffle] class UnsafeRowReceiver(
   queueSize: Int,
+  numShuffleWriters: Int,
+  checkpointIntervalMs: Long,
   override val rpcEnv: RpcEnv)
 extends ThreadSafeRpcEndpoint with ContinuousShuffleReader with 
Logging {
   // Note that this queue will be drained from the main task thread and 
populated in the RPC
   // response thread.
-  private val queue = new 
ArrayBlockingQueue[UnsafeRowReceiverMessage](queueSize)
+  private val queues = Array.fill(numShuffleWriters) {
--- End diff --

Hi TD, just a question here, what's the 'a non-RPC-endpoint-based transfer 
mechanism' refers for?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21385


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-24 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190730661
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -160,25 +170,122 @@ class ContinuousShuffleReadSuite extends StreamTest {
   }
 
   test("blocks waiting for new rows") {
-val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val rdd = new ContinuousShuffleReadRDD(
+  sparkContext, numPartitions = 1, checkpointIntervalMs = 
Long.MaxValue)
+val epoch = rdd.compute(rdd.partitions(0), ctx)
 
 val readRowThread = new Thread {
   override def run(): Unit = {
-// set the non-inheritable thread local
-TaskContext.setTaskContext(ctx)
-val epoch = rdd.compute(rdd.partitions(0), ctx)
-epoch.next().getInt(0)
+try {
+  epoch.next().getInt(0)
+} catch {
+  case _: InterruptedException => // do nothing - expected at test 
ending
+}
   }
 }
 
 try {
   readRowThread.start()
   eventually(timeout(streamingTimeout)) {
-assert(readRowThread.getState == Thread.State.WAITING)
+assert(readRowThread.getState == Thread.State.TIMED_WAITING)
--- End diff --

I see. That's good to know!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-24 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190684685
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -160,25 +170,122 @@ class ContinuousShuffleReadSuite extends StreamTest {
   }
 
   test("blocks waiting for new rows") {
-val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val rdd = new ContinuousShuffleReadRDD(
+  sparkContext, numPartitions = 1, checkpointIntervalMs = 
Long.MaxValue)
+val epoch = rdd.compute(rdd.partitions(0), ctx)
 
 val readRowThread = new Thread {
   override def run(): Unit = {
-// set the non-inheritable thread local
-TaskContext.setTaskContext(ctx)
-val epoch = rdd.compute(rdd.partitions(0), ctx)
-epoch.next().getInt(0)
+try {
+  epoch.next().getInt(0)
+} catch {
+  case _: InterruptedException => // do nothing - expected at test 
ending
+}
   }
 }
 
 try {
   readRowThread.start()
   eventually(timeout(streamingTimeout)) {
-assert(readRowThread.getState == Thread.State.WAITING)
+assert(readRowThread.getState == Thread.State.TIMED_WAITING)
--- End diff --

It changed because we added a timeout. The JVM always puts threads waiting 
without a timeout into WAITING, and threads waiting with a timeout into 
TIMED_WAITING.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190403472
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -160,25 +170,122 @@ class ContinuousShuffleReadSuite extends StreamTest {
   }
 
   test("blocks waiting for new rows") {
-val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val rdd = new ContinuousShuffleReadRDD(
+  sparkContext, numPartitions = 1, checkpointIntervalMs = 
Long.MaxValue)
+val epoch = rdd.compute(rdd.partitions(0), ctx)
 
 val readRowThread = new Thread {
   override def run(): Unit = {
-// set the non-inheritable thread local
-TaskContext.setTaskContext(ctx)
-val epoch = rdd.compute(rdd.partitions(0), ctx)
-epoch.next().getInt(0)
+try {
+  epoch.next().getInt(0)
+} catch {
+  case _: InterruptedException => // do nothing - expected at test 
ending
+}
   }
 }
 
 try {
   readRowThread.start()
   eventually(timeout(streamingTimeout)) {
-assert(readRowThread.getState == Thread.State.WAITING)
+assert(readRowThread.getState == Thread.State.TIMED_WAITING)
   }
 } finally {
   readRowThread.interrupt()
   readRowThread.join()
 }
   }
+
+  test("multiple writers") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 
1, numShuffleWriters = 3)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+send(
+  endpoint,
+  ReceiverRow(0, unsafeRow("writer0-row0")),
+  ReceiverRow(1, unsafeRow("writer1-row0")),
+  ReceiverRow(2, unsafeRow("writer2-row0")),
+  ReceiverEpochMarker(0),
+  ReceiverEpochMarker(1),
+  ReceiverEpochMarker(2)
+)
+
+val firstEpoch = rdd.compute(rdd.partitions(0), ctx)
+assert(firstEpoch.toSeq.map(_.getUTF8String(0).toString).toSet ==
+  Set("writer0-row0", "writer1-row0", "writer2-row0"))
+  }
+
+  test("epoch only ends when all writers send markers") {
+val rdd = new ContinuousShuffleReadRDD(
+  sparkContext, numPartitions = 1, numShuffleWriters = 3, 
checkpointIntervalMs = Long.MaxValue)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+send(
+  endpoint,
+  ReceiverRow(0, unsafeRow("writer0-row0")),
+  ReceiverRow(1, unsafeRow("writer1-row0")),
+  ReceiverRow(2, unsafeRow("writer2-row0")),
+  ReceiverEpochMarker(0),
+  ReceiverEpochMarker(2)
+)
+
+val epoch = rdd.compute(rdd.partitions(0), ctx)
+val rows = (0 until 3).map(_ => epoch.next()).toSet
+assert(rows.map(_.getUTF8String(0).toString) ==
+  Set("writer0-row0", "writer1-row0", "writer2-row0"))
+
+// After checking the right rows, block until we get an epoch marker 
indicating there's no next.
+// (Also fail the assertion if for some reason we get a row.)
+val readEpochMarkerThread = new Thread {
+  override def run(): Unit = {
+assert(!epoch.hasNext)
+  }
+}
+
+readEpochMarkerThread.start()
+eventually(timeout(streamingTimeout)) {
+  assert(readEpochMarkerThread.getState == Thread.State.TIMED_WAITING)
--- End diff --

Same question as above ... is this the only possible thread state.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190403327
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -160,25 +170,122 @@ class ContinuousShuffleReadSuite extends StreamTest {
   }
 
   test("blocks waiting for new rows") {
-val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val rdd = new ContinuousShuffleReadRDD(
+  sparkContext, numPartitions = 1, checkpointIntervalMs = 
Long.MaxValue)
+val epoch = rdd.compute(rdd.partitions(0), ctx)
 
 val readRowThread = new Thread {
   override def run(): Unit = {
-// set the non-inheritable thread local
-TaskContext.setTaskContext(ctx)
-val epoch = rdd.compute(rdd.partitions(0), ctx)
-epoch.next().getInt(0)
+try {
+  epoch.next().getInt(0)
+} catch {
+  case _: InterruptedException => // do nothing - expected at test 
ending
+}
   }
 }
 
 try {
   readRowThread.start()
   eventually(timeout(streamingTimeout)) {
-assert(readRowThread.getState == Thread.State.WAITING)
+assert(readRowThread.getState == Thread.State.TIMED_WAITING)
--- End diff --

Why did this change from WAITING to TIMED_WAITING. Can the thread be one or 
the other state? would this cause flakiness?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190402783
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived = 
Array.fill(numShuffleWriters)(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  // Initialize by submitting tasks to read the first row from each 
writer.
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  /**
+   * In each call to getNext(), we pull the next row available in the 
completion queue, and then
+   * submit another task to read the next row from the writer which 
returned it.
+   *
+   * When a writer sends an epoch marker, we note that it's finished 
and don't submit another
+   * task for it in this epoch. The iterator is over once all writers 
have sent an epoch marker.
+   */
+  override def getNext(): UnsafeRow = {
+var nextRow: UnsafeRow = null
+while (nextRow == null) {
+  nextRow = completion.poll(checkpointIntervalMs, 
TimeUnit.MILLISECONDS) match {
+case null =>
+  // Try again if the poll didn't wait long enough to get a 
real result.
+  // But we should be getting at least an epoch marker every 
checkpoint interval.
+  logWarning(
+s"Completion service failed to make progress after 
$checkpointIntervalMs ms")
+  null
+
+// The completion service guarantees this future will be 
available immediately.
+case future => future.get() match {
+  case ReceiverRow(writerId, r) =>
+// Start reading the next element in the queue we just 
took from.
+completion.submit(completionTask(writerId))
+r
+  case ReceiverEpochMarker(writerId) =>
+// Don't read any more from this queue. If all the writers 
have sent epoch markers,
+// the epoch is over; otherwise we need to poll from the 
remaining writers.
+writerEpochMarkersReceived(writerId) = true
+if (writerEpochMarkersReceived.forall(flag => flag)) {
+  finished = true
+  // Break out of the while loop and end the iterator.
+  return null
--- End diff --

Actually ... finished is the flag, just use that for the while loop, that 
is, `while (!finished && nextRow == null)`. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190402584
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived = 
Array.fill(numShuffleWriters)(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  // Initialize by submitting tasks to read the first row from each 
writer.
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  /**
+   * In each call to getNext(), we pull the next row available in the 
completion queue, and then
+   * submit another task to read the next row from the writer which 
returned it.
+   *
+   * When a writer sends an epoch marker, we note that it's finished 
and don't submit another
+   * task for it in this epoch. The iterator is over once all writers 
have sent an epoch marker.
+   */
+  override def getNext(): UnsafeRow = {
+var nextRow: UnsafeRow = null
+while (nextRow == null) {
+  nextRow = completion.poll(checkpointIntervalMs, 
TimeUnit.MILLISECONDS) match {
+case null =>
+  // Try again if the poll didn't wait long enough to get a 
real result.
+  // But we should be getting at least an epoch marker every 
checkpoint interval.
+  logWarning(
+s"Completion service failed to make progress after 
$checkpointIntervalMs ms")
+  null
+
+// The completion service guarantees this future will be 
available immediately.
+case future => future.get() match {
+  case ReceiverRow(writerId, r) =>
+// Start reading the next element in the queue we just 
took from.
+completion.submit(completionTask(writerId))
+r
+  case ReceiverEpochMarker(writerId) =>
+// Don't read any more from this queue. If all the writers 
have sent epoch markers,
+// the epoch is over; otherwise we need to poll from the 
remaining writers.
+writerEpochMarkersReceived(writerId) = true
+if (writerEpochMarkersReceived.forall(flag => flag)) {
+  finished = true
+  // Break out of the while loop and end the iterator.
+  return null
+} else {
+  // Poll again for the next completion result.
+  null
+}
--- End diff --

if you put `nextRow = newReceivedRow` inside the case ReceiverRow, then 
this else clause is not needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190401710
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived = 
Array.fill(numShuffleWriters)(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  // Initialize by submitting tasks to read the first row from each 
writer.
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  /**
+   * In each call to getNext(), we pull the next row available in the 
completion queue, and then
+   * submit another task to read the next row from the writer which 
returned it.
+   *
+   * When a writer sends an epoch marker, we note that it's finished 
and don't submit another
+   * task for it in this epoch. The iterator is over once all writers 
have sent an epoch marker.
+   */
+  override def getNext(): UnsafeRow = {
+var nextRow: UnsafeRow = null
+while (nextRow == null) {
+  nextRow = completion.poll(checkpointIntervalMs, 
TimeUnit.MILLISECONDS) match {
+case null =>
+  // Try again if the poll didn't wait long enough to get a 
real result.
+  // But we should be getting at least an epoch marker every 
checkpoint interval.
+  logWarning(
+s"Completion service failed to make progress after 
$checkpointIntervalMs ms")
+  null
+
+// The completion service guarantees this future will be 
available immediately.
+case future => future.get() match {
+  case ReceiverRow(writerId, r) =>
+// Start reading the next element in the queue we just 
took from.
+completion.submit(completionTask(writerId))
+r
+  case ReceiverEpochMarker(writerId) =>
+// Don't read any more from this queue. If all the writers 
have sent epoch markers,
+// the epoch is over; otherwise we need to poll from the 
remaining writers.
+writerEpochMarkersReceived(writerId) = true
+if (writerEpochMarkersReceived.forall(flag => flag)) {
--- End diff --

super nit: `writerEpochMarkersReceived.forall(_ == true)` is easier to 
understand.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190401100
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived = 
Array.fill(numShuffleWriters)(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  // Initialize by submitting tasks to read the first row from each 
writer.
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  /**
+   * In each call to getNext(), we pull the next row available in the 
completion queue, and then
+   * submit another task to read the next row from the writer which 
returned it.
+   *
+   * When a writer sends an epoch marker, we note that it's finished 
and don't submit another
+   * task for it in this epoch. The iterator is over once all writers 
have sent an epoch marker.
+   */
+  override def getNext(): UnsafeRow = {
+var nextRow: UnsafeRow = null
+while (nextRow == null) {
+  nextRow = completion.poll(checkpointIntervalMs, 
TimeUnit.MILLISECONDS) match {
+case null =>
+  // Try again if the poll didn't wait long enough to get a 
real result.
+  // But we should be getting at least an epoch marker every 
checkpoint interval.
+  logWarning(
+s"Completion service failed to make progress after 
$checkpointIntervalMs ms")
+  null
+
+// The completion service guarantees this future will be 
available immediately.
+case future => future.get() match {
+  case ReceiverRow(writerId, r) =>
+// Start reading the next element in the queue we just 
took from.
+completion.submit(completionTask(writerId))
+r
+  case ReceiverEpochMarker(writerId) =>
+// Don't read any more from this queue. If all the writers 
have sent epoch markers,
+// the epoch is over; otherwise we need to poll from the 
remaining writers.
+writerEpochMarkersReceived(writerId) = true
+if (writerEpochMarkersReceived.forall(flag => flag)) {
+  finished = true
+  // Break out of the while loop and end the iterator.
+  return null
--- End diff --

super nit: I personally find these sort of escape hard to read in code. 
Consider making a separate flag to end the while loop.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190400926
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived = 
Array.fill(numShuffleWriters)(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  // Initialize by submitting tasks to read the first row from each 
writer.
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  /**
+   * In each call to getNext(), we pull the next row available in the 
completion queue, and then
+   * submit another task to read the next row from the writer which 
returned it.
+   *
+   * When a writer sends an epoch marker, we note that it's finished 
and don't submit another
+   * task for it in this epoch. The iterator is over once all writers 
have sent an epoch marker.
+   */
+  override def getNext(): UnsafeRow = {
+var nextRow: UnsafeRow = null
+while (nextRow == null) {
+  nextRow = completion.poll(checkpointIntervalMs, 
TimeUnit.MILLISECONDS) match {
--- End diff --

move this inside... its hard to follow the pattern `x = // some large code 
structured`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190398051
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived = 
Array.fill(numShuffleWriters)(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  // Initialize by submitting tasks to read the first row from each 
writer.
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  /**
+   * In each call to getNext(), we pull the next row available in the 
completion queue, and then
+   * submit another task to read the next row from the writer which 
returned it.
+   *
+   * When a writer sends an epoch marker, we note that it's finished 
and don't submit another
+   * task for it in this epoch. The iterator is over once all writers 
have sent an epoch marker.
+   */
+  override def getNext(): UnsafeRow = {
+var nextRow: UnsafeRow = null
+while (nextRow == null) {
+  nextRow = completion.poll(checkpointIntervalMs, 
TimeUnit.MILLISECONDS) match {
+case null =>
+  // Try again if the poll didn't wait long enough to get a 
real result.
+  // But we should be getting at least an epoch marker every 
checkpoint interval.
+  logWarning(
+s"Completion service failed to make progress after 
$checkpointIntervalMs ms")
--- End diff --

Maybe also print which writers (and how many) are we waiting on, so that we 
can debug.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190396813
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -42,16 +47,24 @@ case class ContinuousShuffleReadPartition(index: Int, 
queueSize: Int) extends Pa
  * RDD at the map side of each continuous processing shuffle task. 
Upstream tasks send their
  * shuffle output to the wrapped receivers in partitions of this RDD; each 
of the RDD's tasks
  * poll from their receiver until an epoch marker is sent.
+ *
+ * @param sc the RDD context
+ * @param numPartitions the number of read partitions for this RDD
+ * @param queueSize the size of the row buffers to use
+ * @param numShuffleWriters the number of continuous shuffle writers 
feeding into this RDD
+ * @param checkpointIntervalMs the checkpoint interval of the streaming 
query
  */
 class ContinuousShuffleReadRDD(
 sc: SparkContext,
 numPartitions: Int,
-queueSize: Int = 1024)
+queueSize: Int = 1024,
+numShuffleWriters: Int = 1,
+checkpointIntervalMs: Long = 1000)
--- End diff --

Isnt this the same as epochInterval? the term "epoch" is more well known in 
the code than "checkpoint"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190354487
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -41,11 +50,15 @@ private[shuffle] case class ReceiverEpochMarker() 
extends UnsafeRowReceiverMessa
  */
 private[shuffle] class UnsafeRowReceiver(
   queueSize: Int,
+  numShuffleWriters: Int,
+  checkpointIntervalMs: Long,
   override val rpcEnv: RpcEnv)
 extends ThreadSafeRpcEndpoint with ContinuousShuffleReader with 
Logging {
   // Note that this queue will be drained from the main task thread and 
populated in the RPC
   // response thread.
-  private val queue = new 
ArrayBlockingQueue[UnsafeRowReceiverMessage](queueSize)
+  private val queues = Array.fill(numShuffleWriters) {
--- End diff --

I agree. This level of issues can be dealt with later once we run this and 
observed the issues for real. Also, it's likely that we will eventually replace 
a whole lot of this with a non-RPC-endpoint-based transfer mechanism. So lets 
keep something very simple for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190331721
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived = 
Array.fill(numShuffleWriters)(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
--- End diff --

Ah, yes the n-queues makes it difficult. It would have been straight 
forward if there was only one queue. I guess here you would have to round-robin 
and poll the queues with very small timeouts so that you don't block on any 
queue. If this is a prototype either approach is fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190328335
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived = 
Array.fill(numShuffleWriters)(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
--- End diff --

I tried to make that work, but I couldn't figure out how to determine from 
the task thread which queue is the right one. We can't round-robin, or else 
we'll fail to make progress if some writers send more data than others.

As mentioned in the doc, this iteration of the shuffle functionality isn't 
meant to be particularly scalable - we're already incurring a bunch of overhead 
from using the RPC framework to send data around. The focus for now is 
correctness and debuggability. Once we have everything in place, we'll swap out 
everything between ContinuousShuffleReader and ContinuousShuffleWriter with 
some sort of TCP socket approach, which will probably be able to do away with 
the N queues entirely by leveraging TCP backpressure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190327022
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -41,11 +50,15 @@ private[shuffle] case class ReceiverEpochMarker() 
extends UnsafeRowReceiverMessa
  */
 private[shuffle] class UnsafeRowReceiver(
   queueSize: Int,
+  numShuffleWriters: Int,
+  checkpointIntervalMs: Long,
   override val rpcEnv: RpcEnv)
 extends ThreadSafeRpcEndpoint with ContinuousShuffleReader with 
Logging {
   // Note that this queue will be drained from the main task thread and 
populated in the RPC
   // response thread.
-  private val queue = new 
ArrayBlockingQueue[UnsafeRowReceiverMessage](queueSize)
+  private val queues = Array.fill(numShuffleWriters) {
--- End diff --

Then it would be configuration nightmare..It should be ok to start with the 
proposed approach and use same size for all the queues. We can change it later 
if we see issues as long as the internal details are not exposed to the user.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190324822
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -41,11 +50,15 @@ private[shuffle] case class ReceiverEpochMarker() 
extends UnsafeRowReceiverMessa
  */
 private[shuffle] class UnsafeRowReceiver(
   queueSize: Int,
+  numShuffleWriters: Int,
+  checkpointIntervalMs: Long,
   override val rpcEnv: RpcEnv)
 extends ThreadSafeRpcEndpoint with ContinuousShuffleReader with 
Logging {
   // Note that this queue will be drained from the main task thread and 
populated in the RPC
   // response thread.
-  private val queue = new 
ArrayBlockingQueue[UnsafeRowReceiverMessage](queueSize)
+  private val queues = Array.fill(numShuffleWriters) {
--- End diff --

Well, is it a problem if we don't set a single size across all the queues?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190320670
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived = 
Array.fill(numShuffleWriters)(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
--- End diff --

Are we creating threads equal to num writers here and do we need it? I 
assume there would be already the RDD task thread per partition that's invoking 
`getNext` and cant we return the row from the right queue(s) there ? My concern 
is that these threads are going to bloat up the total threads to m*n (rdd 
partitions * shuffle writers) and this may not be very scalable considering the 
amount of context switch.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190316408
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -41,11 +50,15 @@ private[shuffle] case class ReceiverEpochMarker() 
extends UnsafeRowReceiverMessa
  */
 private[shuffle] class UnsafeRowReceiver(
   queueSize: Int,
+  numShuffleWriters: Int,
+  checkpointIntervalMs: Long,
   override val rpcEnv: RpcEnv)
 extends ThreadSafeRpcEndpoint with ContinuousShuffleReader with 
Logging {
   // Note that this queue will be drained from the main task thread and 
populated in the RPC
   // response thread.
-  private val queue = new 
ArrayBlockingQueue[UnsafeRowReceiverMessage](queueSize)
+  private val queues = Array.fill(numShuffleWriters) {
--- End diff --

Queue per writer makes it easier to handle the alignment. However, if the 
queues do not get filled evenly, it makes it much more difficult to set a 
single size across all the queues. In a single queue this would not be much of 
an issue since the queues are shared by all writers.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190305870
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived =
--- End diff --

Good point. I guess that was what TD was suggesting too now that I think 
about it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190131693
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived =
+mutable.Map.empty[Int, Boolean].withDefaultValue(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
--- End diff --

And I'm also now seeing this approach as alternative to deal with alignment 
(not buffer rows explicitly but just don't read after epoch comes in). Nice 
approach.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190129892
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived =
+mutable.Map.empty[Int, Boolean].withDefaultValue(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  // Initialize by submitting tasks to read the first row from each 
writer.
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  /**
+   * In each call to getNext(), we pull the next row available in the 
completion queue, and then
+   * submit another task to read the next row from the writer which 
returned it.
+   *
+   * When a writer sends an epoch marker, we note that it's finished 
and don't submit another
+   * task for it in this epoch. The iterator is over once all writers 
have sent an epoch marker.
+   */
+  override def getNext(): UnsafeRow = {
+var nextRow: UnsafeRow = null
+while (nextRow == null) {
+  nextRow = completion.poll(checkpointIntervalMs, 
TimeUnit.MILLISECONDS) match {
+case null =>
+  // Try again if the poll didn't wait long enough to get a 
real result.
+  // But we should be getting at least an epoch marker every 
checkpoint interval.
+  logWarning(
+s"Completion service failed to make progress after 
$checkpointIntervalMs ms")
+  null
+
+// The completion service guarantees this future will be 
available immediately.
+case future => future.get() match {
+  case ReceiverRow(writerId, r) =>
+// Start reading the next element in the queue we just 
took from.
+completion.submit(completionTask(writerId))
+r
+  // TODO use writerId
--- End diff --

It looks like to be not needed as of now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190125731
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived =
+mutable.Map.empty[Int, Boolean].withDefaultValue(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
--- End diff --

As I commented earlier in design doc that I was in favor of single queue, 
because I thought it minimizes the thread count which may avoid unnecessary 
contention (as well as code complexity in this case), and also defines the 
condition of backpressure fairly simple (if RPC requests can block infinitely 
unless queue has room to write). 

But as I read some articles regarding `multiple writers, single reader on 
single queue` vs `single writer, single reader on multiple queues per writer`, 
unless we introduce highly-optimized queue like Disruptor, second approach 
looks like perform better. 

So the approach looks great to me for now, and at least we could consider 
replacing this with adopting queue library when we encounter the bad situation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190120836
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived =
--- End diff --

The map will always contain `(writerId, true)` which value is not needed at 
all, and we are only concerned about the writerId which range is 0 until 
numShuffleWriters, so it might be better to consider alternative as well.

Looks like this could be also a Set with pre-initialized to 0 until 
numShuffleWriters, and we can remove the element when we receive mark. If the 
element is still in a set, this represents we didn't receive mark from such 
writer yet.

In similar approach, it can be pre-initialized Array of Boolean with value 
as true/false.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-22 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190088531
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  private val numWriterEpochMarkers = new AtomicInteger(0)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  override def getNext(): UnsafeRow = {
+completion.take().get() match {
+  case ReceiverRow(writerId, r) =>
+// Start reading the next element in the queue we just took 
from.
+completion.submit(completionTask(writerId))
+r
+// TODO use writerId
+  case ReceiverEpochMarker(writerId) =>
+// Don't read any more from this queue. If all the writers 
have sent epoch markers,
+// the epoch is over; otherwise we need rows from one of the 
remaining writers.
+val writersCompleted = numWriterEpochMarkers.incrementAndGet()
--- End diff --

A map ended up being a more natural way to express the logic I think.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-22 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190088467
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  private val numWriterEpochMarkers = new AtomicInteger(0)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  override def getNext(): UnsafeRow = {
+completion.take().get() match {
+  case ReceiverRow(writerId, r) =>
+// Start reading the next element in the queue we just took 
from.
+completion.submit(completionTask(writerId))
+r
+// TODO use writerId
--- End diff --

It was for the verification you mentioned below. I left it as a marker for 
myself and then forgot to go back to it :(


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190082630
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -161,13 +189,15 @@ class ContinuousShuffleReadSuite extends StreamTest {
 
   test("blocks waiting for new rows") {
 val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val epoch = rdd.compute(rdd.partitions(0), ctx)
 
 val readRowThread = new Thread {
   override def run(): Unit = {
-// set the non-inheritable thread local
-TaskContext.setTaskContext(ctx)
-val epoch = rdd.compute(rdd.partitions(0), ctx)
-epoch.next().getInt(0)
+try {
--- End diff --

okay. that make sense. I was worried it was for some nasty error condition 
that i missed in the previous PR review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-22 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190080895
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -161,13 +189,15 @@ class ContinuousShuffleReadSuite extends StreamTest {
 
   test("blocks waiting for new rows") {
 val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val epoch = rdd.compute(rdd.partitions(0), ctx)
 
 val readRowThread = new Thread {
   override def run(): Unit = {
-// set the non-inheritable thread local
-TaskContext.setTaskContext(ctx)
-val epoch = rdd.compute(rdd.partitions(0), ctx)
-epoch.next().getInt(0)
+try {
--- End diff --

I realized when writing a test further down that it was cleaner to get the 
RDD outside the thread, and that not catching the InterruptedException was 
spamming my console.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190061592
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  private val numWriterEpochMarkers = new AtomicInteger(0)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  override def getNext(): UnsafeRow = {
+completion.take().get() match {
--- End diff --

`take().get()` can block indefinitely if there happens to be issues like 
marker is never received from writer. Instead I suggest a small change. Put 
this in a loop (also suggested below) and use `poll(timeout)`. Every time it 
timesout, print the status of what / how many writers is this waiting for. This 
is be super help for debugging such issues, without adding much complexity. The 
timeout could be the epoch time interval, because expect markers to be received 
within that period. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190069654
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -181,4 +211,80 @@ class ContinuousShuffleReadSuite extends StreamTest {
   readRowThread.join()
 }
   }
+
+  test("epoch only ends when all writers send markers") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 
1, numShuffleWriters = 3)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+send(
+  endpoint,
+  ReceiverRow(0, unsafeRow("writer0-row0")),
+  ReceiverRow(1, unsafeRow("writer1-row0")),
+  ReceiverRow(2, unsafeRow("writer2-row0")),
+  ReceiverEpochMarker(0),
+  ReceiverEpochMarker(2)
+)
+
+val epoch = rdd.compute(rdd.partitions(0), ctx)
+val rows = (0 until 3).map(_ => epoch.next()).toSet
+assert(rows.map(_.getUTF8String(0).toString) ==
+  Set("writer0-row0", "writer1-row0", "writer2-row0"))
+
+// After checking the right rows, block until we get an epoch marker 
indicating there's no next.
+// (Also fail the assertion if for some reason we get a row.)
+val readEpochMarkerThread = new Thread {
+  override def run(): Unit = {
+assert(!epoch.hasNext)
+  }
+}
+
+readEpochMarkerThread.start()
+
--- End diff --

nit: remove empty line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190059433
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  private val numWriterEpochMarkers = new AtomicInteger(0)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  override def getNext(): UnsafeRow = {
+completion.take().get() match {
+  case ReceiverRow(writerId, r) =>
+// Start reading the next element in the queue we just took 
from.
+completion.submit(completionTask(writerId))
+r
+// TODO use writerId
--- End diff --

what is this todo for?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190069533
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -115,17 +124,36 @@ class ContinuousShuffleReadSuite extends StreamTest {
 assert(secondEpoch.toSeq.map(_.getInt(0)) == Seq(222, 333))
   }
 
+  test("multiple writers") {
--- End diff --

nit: move this below with all the other multi writer tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190056646
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  private val numWriterEpochMarkers = new AtomicInteger(0)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
--- End diff --

Would be nice to add a bit of docs on the control flow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190069145
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -161,13 +189,15 @@ class ContinuousShuffleReadSuite extends StreamTest {
 
   test("blocks waiting for new rows") {
 val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val epoch = rdd.compute(rdd.partitions(0), ctx)
 
 val readRowThread = new Thread {
   override def run(): Unit = {
-// set the non-inheritable thread local
-TaskContext.setTaskContext(ctx)
-val epoch = rdd.compute(rdd.partitions(0), ctx)
-epoch.next().getInt(0)
+try {
--- End diff --

why this change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190060224
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  private val numWriterEpochMarkers = new AtomicInteger(0)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  override def getNext(): UnsafeRow = {
+completion.take().get() match {
+  case ReceiverRow(writerId, r) =>
+// Start reading the next element in the queue we just took 
from.
+completion.submit(completionTask(writerId))
+r
+// TODO use writerId
+  case ReceiverEpochMarker(writerId) =>
+// Don't read any more from this queue. If all the writers 
have sent epoch markers,
+// the epoch is over; otherwise we need rows from one of the 
remaining writers.
+val writersCompleted = numWriterEpochMarkers.incrementAndGet()
--- End diff --

I think you should also verify that we dont accidentally receiver two 
markers from the same id. So instead of count, we should track an array of what 
left to be committed. That may also be useful for debugging issues like the 
queue never receiving a marker from a writer for some reason. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190058882
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -41,11 +44,14 @@ private[shuffle] case class ReceiverEpochMarker() 
extends UnsafeRowReceiverMessa
  */
--- End diff --

update docs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190057347
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  private val numWriterEpochMarkers = new AtomicInteger(0)
--- End diff --

does this need a atomicinteger? seems like this is only incremented and 
accessed from the main thread draining the iterator below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190057920
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +62,46 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  private val numWriterEpochMarkers = new AtomicInteger(0)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  override def getNext(): UnsafeRow = {
+completion.take().get() match {
+  case ReceiverRow(writerId, r) =>
+// Start reading the next element in the queue we just took 
from.
+completion.submit(completionTask(writerId))
+r
+// TODO use writerId
+  case ReceiverEpochMarker(writerId) =>
+// Don't read any more from this queue. If all the writers 
have sent epoch markers,
+// the epoch is over; otherwise we need rows from one of the 
remaining writers.
+val writersCompleted = numWriterEpochMarkers.incrementAndGet()
+if (writersCompleted == numShuffleWriters) {
+  finished = true
+  null
+} else {
+  getNext()
--- End diff --

with a large number of shuffle writers, there is a very slight chance that 
this is can lead to a stackoverflow; when you accidentally get a sequence of N 
markers from N writers and N is large. make this a while loop. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-21 Thread jose-torres
GitHub user jose-torres opened a pull request:

https://github.com/apache/spark/pull/21385

[SPARK-24234][SS] Support multiple row writers in continuous processing 
shuffle reader.

## What changes were proposed in this pull request?


https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii

Support multiple different row writers in continuous processing shuffle 
reader.

Note that having multiple read-side buffers ended up being the natural way 
to do this. Otherwise it's hard to express the constraint of sending an epoch 
marker only when all writers have sent one.

## How was this patch tested?

new unit tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jose-torres/spark multipleWrite

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21385.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21385


commit 1d6b71898e2a640e3c0809695d2b83f3f84eaa38
Author: Jose Torres 
Date:   2018-05-15T18:07:54Z

continuous shuffle read RDD

commit b5d100875932bdfcb645c8f6b2cdb7b815d84c80
Author: Jose Torres 
Date:   2018-05-17T03:11:11Z

docs

commit af407694a5f13c18568da4a63848f82374a44377
Author: Jose Torres 
Date:   2018-05-17T03:19:37Z

Merge remote-tracking branch 'apache/master' into readerRddMaster

commit 46456dc75a6aec9659b18523c421999debd060eb
Author: Jose Torres 
Date:   2018-05-17T03:22:49Z

fix ctor

commit 2ea8a6f94216e8b184e5780ec3e6ffb2838de382
Author: Jose Torres 
Date:   2018-05-17T03:43:10Z

multiple partition test

commit 955ac79eb05dc389e632d1aaa6c59396835c6ed5
Author: Jose Torres 
Date:   2018-05-17T13:33:51Z

unset task context after test

commit 8cefb724512b51f2aa1fdd81fa8a2d4560e60ce3
Author: Jose Torres 
Date:   2018-05-18T00:00:05Z

conf from RDD

commit f91bfe7e3fc174202d7d5c7cde5a8fb7ce86bfd3
Author: Jose Torres 
Date:   2018-05-18T00:00:44Z

endpoint name

commit 259029298fc42a65e8ebb4d2effe49b7fafa96f1
Author: Jose Torres 
Date:   2018-05-18T00:02:08Z

testing bool

commit 859e6e4dd4dd90ffd70fc9cbd243c94090d72506
Author: Jose Torres 
Date:   2018-05-18T00:22:10Z

tests

commit b23b7bb17abe3cbc873a3144c56d08c88bc0c963
Author: Jose Torres 
Date:   2018-05-18T00:40:55Z

take instead of poll

commit 97f7e8ff865e6054d0d70914ce9bb51880b161f6
Author: Jose Torres 
Date:   2018-05-18T00:58:44Z

add interface

commit de21b1c25a333d44c0521fe151b468e51f0bdc47
Author: Jose Torres 
Date:   2018-05-18T01:02:37Z

clarify comment

commit 7dcf51a13e92a0bb2998e2a12e67d351e1c1a4fc
Author: Jose Torres 
Date:   2018-05-18T22:39:28Z

multiple

commit 154843d799683c5cdfc035033475f223f85f0d66
Author: Jose Torres 
Date:   2018-05-18T22:41:18Z

don't use spark conf for the sql conf

commit f0262d0a9d3539bcf8fbdbb248968fd704d1e690
Author: Jose Torres 
Date:   2018-05-18T22:54:27Z

end thread

commit 3e7a6f9d31967d9efc618c4d319a9dabd22ae4e5
Author: Jose Torres 
Date:   2018-05-18T22:54:54Z

name thread

commit 0a38ced23b7e1a6dfe9588ef0ebf7c071a08055d
Author: Jose Torres 
Date:   2018-05-18T22:55:14Z

no toString

commit ef34e6e9817274df9378341bfb52105c591a5507
Author: Jose Torres 
Date:   2018-05-18T23:00:01Z

send method

commit 00f910ea39b76a24e1e21acdf3d6a20fd7784fa9
Author: Jose Torres 
Date:   2018-05-18T23:02:10Z

fix

commit aa463ddfd6dcc860f3a9119afe5db32db06945d8
Author: Jose Torres 
Date:   2018-05-21T16:27:23Z

Merge branch 'readerRddMaster' into multipleWrite

commit 504bf7426acf16cce21c549e494b8149dbaa3774
Author: Jose Torres 
Date:   2018-05-21T17:13:33Z

add test

commit 79c3158fc4bb70eb145503699eb6007a029e0c6c
Author: Jose Torres 
Date:   2018-05-21T20:10:06Z

Merge remote-tracking branch 'apache/master' into multipleWrite




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org