Ngone51 commented on a change in pull request #34156:
URL: https://github.com/apache/spark/pull/34156#discussion_r720615835
##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -519,17 +521,19 @@ private[spark] abstract class MapOutputTracker(conf:
SparkConf) extends Logging
* but endMapIndex is excluded). If endMapIndex=Int.MaxValue, the actual
endMapIndex will be
* changed to the length of total map outputs.
*
- * @return A sequence of 2-item tuples, where the first item in the tuple is
a BlockManagerId,
- * and the second item is a sequence of (shuffle block id, shuffle
block size, map index)
- * tuples describing the shuffle blocks that are stored at that
block manager.
- * Note that zero-sized blocks are excluded in the result.
+ * @return A case class object which includes two attributes. The first
attribute is a sequence
+ * of 2-item tuples, where the first item in the tuple is a
BlockManagerId, and the
+ * second item is a sequence of (shuffle block id, shuffle block
size, map index) tuples
+ * tuples describing the shuffle blocks that are stored at that
block manager. Note that
+ * zero-sized blocks are excluded in the result. The second
attribute is a boolean flag,
+ * indicating whether batch fetch can be enabled.
*/
def getMapSizesByExecutorId(
shuffleId: Int,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
- endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
+ endPartition: Int): MapSizesByExecutorId
Review comment:
Yeah, but as you may know, it's very common that Spark private APIs are
abused as public APIs. We did break downstream use cases by changing private
APIs, e.g., SQL UDF stuff. And I know some users implement their own shuffle
module by extending `ShuffleManager`, `MapOutputTracker`, etc. Given that
we're very close to the 3.2 release, I think it's not appropriate to have this
kind of change at this moment.
If you guys still insist on this way, I'd suggest to totally separate the
push-based shuffle path with the normal shuffle path from
`SortShuffleManager.getReader`, e.g.,
```scala
override def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
val (blocksByAddress, enableBatchFetch ) = if (fetchMergeResult) {
val res =
SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(...)
(res.iter, res.enableBatchFetch)
} else {
val address =
SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(...)
(address, canUseBatchFetch(startPartition, endPartition, context))
}
val blocksByAddress =
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress.,
context, metrics,
shouldBatchFetch = enableBatchFetch)
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]