HyukjinKwon commented on a change in pull request #33310:
URL: https://github.com/apache/spark/pull/33310#discussion_r672302356



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
##########
@@ -85,12 +85,24 @@ object OptimizeLocalShuffleReader extends 
CustomShuffleReaderRule {
     val expectedParallelism = advisoryParallelism.getOrElse(numReducers)
     val splitPoints = if (numMappers == 0) {
       Seq.empty
-    } else {
-      equallyDivide(numReducers, math.max(1, expectedParallelism / numMappers))
+    } else if (expectedParallelism >= numMappers) {
+      equallyDivide(numReducers, expectedParallelism / numMappers)
+    }
+    else {

Review comment:
       ```suggestion
       } else {
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
##########
@@ -58,6 +58,12 @@ case class PartialMapperPartitionSpec(
     startReducerIndex: Int,
     endReducerIndex: Int) extends ShufflePartitionSpec
 
+// Follow-up: consider using map location for coalescing mappers

Review comment:
       If we plan to leave it as a followup, we should file a JIRA, and fix the 
comment as, for example, `// TODO(SPARK-XXXXX): blah blah blah`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
##########
@@ -72,12 +72,23 @@ object OptimizeLocalShuffleReader extends 
CustomShuffleReaderRule {
     val expectedParallelism = advisoryParallelism.getOrElse(numReducers)
     val splitPoints = if (numMappers == 0) {
       Seq.empty
+    } else if (expectedParallelism >= numMappers) {
+      equallyDivide(numReducers, expectedParallelism / numMappers)
     } else {
-      equallyDivide(numReducers, math.max(1, expectedParallelism / numMappers))
+      equallyDivide(numMappers, expectedParallelism)
     }
-    (0 until numMappers).flatMap { mapIndex =>
-      (splitPoints :+ numReducers).sliding(2).map {
-        case Seq(start, end) => PartialMapperPartitionSpec(mapIndex, start, 
end)
+    if (expectedParallelism >= numMappers) {
+      (0 until numMappers).flatMap { mapIndex =>
+        (splitPoints :+ numReducers).sliding(2).map {
+          case Seq(start, end) => PartialMapperPartitionSpec(mapIndex, start, 
end)
+        }
+      }
+    }
+    else {

Review comment:
       ```suggestion
       } else {
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
##########
@@ -58,6 +58,12 @@ case class PartialMapperPartitionSpec(
     startReducerIndex: Int,
     endReducerIndex: Int) extends ShufflePartitionSpec
 
+// Follow-up: consider using map location for coalescing mappers

Review comment:
       I guess @cloud-fan wants to address 
https://github.com/apache/spark/pull/33310/files#r669399284 in this PR though 
(?). cc @maryannxue FYI too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to