HyukjinKwon commented on a change in pull request #33310:
URL: https://github.com/apache/spark/pull/33310#discussion_r672302356
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
##########
@@ -85,12 +85,24 @@ object OptimizeLocalShuffleReader extends
CustomShuffleReaderRule {
val expectedParallelism = advisoryParallelism.getOrElse(numReducers)
val splitPoints = if (numMappers == 0) {
Seq.empty
- } else {
- equallyDivide(numReducers, math.max(1, expectedParallelism / numMappers))
+ } else if (expectedParallelism >= numMappers) {
+ equallyDivide(numReducers, expectedParallelism / numMappers)
+ }
+ else {
Review comment:
```suggestion
} else {
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
##########
@@ -58,6 +58,12 @@ case class PartialMapperPartitionSpec(
startReducerIndex: Int,
endReducerIndex: Int) extends ShufflePartitionSpec
+// Follow-up: consider using map location for coalescing mappers
Review comment:
If we plan to leave it as a followup, we should file a JIRA, and fix the
comment as, for example, `// TODO(SPARK-XXXXX): blah blah blah`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
##########
@@ -72,12 +72,23 @@ object OptimizeLocalShuffleReader extends
CustomShuffleReaderRule {
val expectedParallelism = advisoryParallelism.getOrElse(numReducers)
val splitPoints = if (numMappers == 0) {
Seq.empty
+ } else if (expectedParallelism >= numMappers) {
+ equallyDivide(numReducers, expectedParallelism / numMappers)
} else {
- equallyDivide(numReducers, math.max(1, expectedParallelism / numMappers))
+ equallyDivide(numMappers, expectedParallelism)
}
- (0 until numMappers).flatMap { mapIndex =>
- (splitPoints :+ numReducers).sliding(2).map {
- case Seq(start, end) => PartialMapperPartitionSpec(mapIndex, start,
end)
+ if (expectedParallelism >= numMappers) {
+ (0 until numMappers).flatMap { mapIndex =>
+ (splitPoints :+ numReducers).sliding(2).map {
+ case Seq(start, end) => PartialMapperPartitionSpec(mapIndex, start,
end)
+ }
+ }
+ }
+ else {
Review comment:
```suggestion
} else {
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
##########
@@ -58,6 +58,12 @@ case class PartialMapperPartitionSpec(
startReducerIndex: Int,
endReducerIndex: Int) extends ShufflePartitionSpec
+// Follow-up: consider using map location for coalescing mappers
Review comment:
I guess @cloud-fan wants to address
https://github.com/apache/spark/pull/33310/files#r669399284 in this PR though
(?). cc @maryannxue FYI too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]