Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/spark/pull/21560#discussion_r196999896
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
---
@@ -61,12 +63,14 @@ class ContinuousShuffleReadRDD(
numPartitions: Int,
queueSize: Int = 1024,
numShuffleWriters: Int = 1,
- epochIntervalMs: Long = 1000)
+ epochIntervalMs: Long = 1000,
+ val endpointNames: Seq[String] =
Seq(s"RPCContinuousShuffleReader-${UUID.randomUUID()}"))
extends RDD[UnsafeRow](sc, Nil) {
override protected def getPartitions: Array[Partition] = {
(0 until numPartitions).map { partIndex =>
- ContinuousShuffleReadPartition(partIndex, queueSize,
numShuffleWriters, epochIntervalMs)
+ ContinuousShuffleReadPartition(
+ partIndex, endpointNames(partIndex), queueSize, numShuffleWriters,
epochIntervalMs)
--- End diff --
This effectively asserting numPartitions to be 1, otherwise it will throw
exception.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]