[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-15 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r334792481
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
 ##
 @@ -36,18 +36,33 @@ private[spark] class BlockStoreShuffleReader[K, C](
 readMetrics: ShuffleReadMetricsReporter,
 serializerManager: SerializerManager = SparkEnv.get.serializerManager,
 blockManager: BlockManager = SparkEnv.get.blockManager,
-mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker)
+mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker,
+mapId: Option[Int] = None)
   extends ShuffleReader[K, C] with Logging {
 
   private val dep = handle.dependency
 
   /** Read the combined key-values for this reduce task */
   override def read(): Iterator[Product2[K, C]] = {
+val blocksByAddress = mapId match {
+  case (Some(mapId)) => mapOutputTracker.getMapSizesByExecutorId(
+handle.shuffleId,
+startPartition,
+endPartition,
+mapId)
+  case (None) => mapOutputTracker.getMapSizesByExecutorId(
+handle.shuffleId,
+startPartition,
+endPartition)
+  case (_) => throw new IllegalArgumentException(
 
 Review comment:
   nit:
   ```
   case Some(..) =>
   case None =>
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-14 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r334379189
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 ##
 @@ -221,7 +230,8 @@ class AdaptiveQueryExecSuite
   val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
   assert(bhj.size == 2)
   val ex = findReusedExchange(adaptivePlan)
-  assert(ex.size == 1)
+  // The ReusedExchange is hidden in LocalShuffleReaderExec
 
 Review comment:
   We should update `findReusedExchange` and make it go through 
`LocalShuffleReaderExec`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-14 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r334377854
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 ##
 @@ -355,6 +365,28 @@ class AdaptiveQueryExecSuite
 }
   }
 
+  test("Change merge join to broadcast join and optimize the shuffle" +
+" reader to local shuffle reader") {
 
 Review comment:
   how about "Change merge join to broadcast join without local shuffle reader"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-14 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r334369858
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
 ##
 @@ -893,31 +979,59 @@ private[spark] object MapOutputTracker extends Logging {
   startPartition: Int,
   endPartition: Int,
   statuses: Array[MapStatus],
-  useOldFetchProtocol: Boolean): Iterator[(BlockManagerId, Seq[(BlockId, 
Long, Int)])] = {
+  useOldFetchProtocol: Boolean,
+  mapId : Option[Int] = None): Iterator[(BlockManagerId, Seq[(BlockId, 
Long, Int)])] = {
 assert (statuses != null)
 val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, 
Long, Int)]]
-for ((status, mapIndex) <- statuses.iterator.zipWithIndex) {
-  if (status == null) {
-val errorMessage = s"Missing an output location for shuffle $shuffleId"
-logError(errorMessage)
-throw new MetadataFetchFailedException(shuffleId, startPartition, 
errorMessage)
-  } else {
-for (part <- startPartition until endPartition) {
-  val size = status.getSizeForBlock(part)
-  if (size != 0) {
-if (useOldFetchProtocol) {
-  // While we use the old shuffle fetch protocol, we use mapIndex 
as mapId in the
-  // ShuffleBlockId.
-  splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=
-((ShuffleBlockId(shuffleId, mapIndex, part), size, mapIndex))
-} else {
-  splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=
-((ShuffleBlockId(shuffleId, status.mapTaskId, part), size, 
mapIndex))
+mapId match {
+  case (Some(mapId)) =>
+for ((status, mapIndex) <- statuses.iterator.zipWithIndex.filter(_._2 
== mapId)) {
 
 Review comment:
   nit:
   ```
   val iter = statuses.iterator.zipWithIndex
   for ((status, mapIndex) <- mapId.map(id => iter.filter(_._2 == 
id)).getOrElse(iter))
   ```
   
   Then we can save a lot of duplicated code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-14 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r334366021
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
 ##
 @@ -669,6 +685,31 @@ private[spark] class MapOutputTrackerMaster(
 None
   }
 
+  /**
+   * Return the locations where the Mapper(s) ran. The locations each includes 
both a host and an
 
 Review comment:
   The doc should be updated as we only get one location for one mapper now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333946016
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
 ##
 @@ -749,6 +818,26 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
 }
   }
 
+  override def getMapSizesByExecutorId(
+  shuffleId: Int,
+  startPartition: Int,
+  endPartition: Int,
+  mapId: Int,
+  useOldFetchProtocol: Boolean) : Iterator[(BlockManagerId, Seq[(BlockId, 
Long, Int)])] = {
+logDebug(s"Fetching outputs for shuffle $shuffleId, mapId $mapId" +
+  s"partitions $startPartition-$endPartition")
+val statuses = getStatuses(shuffleId)
 
 Review comment:
   Since we need to get all the shuffle status anyway, we can call the existing 
`convertMapStatuses`, and do an extra filter to only collect the blocks whose 
mapIndex is what we want.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333943901
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
 ##
 @@ -669,6 +685,34 @@ private[spark] class MapOutputTrackerMaster(
 None
   }
 
+  /**
+   * Return the locations where the Mapper(s) ran. The locations each includes 
both a host and an
+   * executor id on that host.
+   *
+   * @param dep shuffle dependency object
+   * @param startMapId the start map id
+   * @param endMapId the end map id
 
 Review comment:
   We shouldn't add code that only for future use. For now we only need a 
single `mapId`, IIUC.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333943188
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
 ##
 @@ -54,6 +54,19 @@ private[spark] trait ShuffleManager {
   context: TaskContext,
   metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
 
+  /**
+   * Get a reader for a range of reduce partitions (startPartition to 
endPartition-1, inclusive) to
+   * read from mapId.
+   * Called on executors by reduce tasks.
+   */
+  def getMapReader[K, C](
 
 Review comment:
   instead of creating a new API, how about we set the map id in 
`TaskContext.getLocalProperties`, and still call the existing `getReader` 
method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333941162
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
 ##
 @@ -36,19 +36,35 @@ private[spark] class BlockStoreShuffleReader[K, C](
 readMetrics: ShuffleReadMetricsReporter,
 serializerManager: SerializerManager = SparkEnv.get.serializerManager,
 blockManager: BlockManager = SparkEnv.get.blockManager,
-mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker)
+mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker,
+mapId: Option[Int] = None)
   extends ShuffleReader[K, C] with Logging {
 
   private val dep = handle.dependency
 
   /** Read the combined key-values for this reduce task */
   override def read(): Iterator[Product2[K, C]] = {
+val blocksByAddress = (mapId) match {
 
 Review comment:
   `(mapId)` -> `mapId`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333940553
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
 ##
 @@ -54,6 +54,19 @@ private[spark] trait ShuffleManager {
   context: TaskContext,
   metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
 
+  /**
+   * Get a reader for a range of reduce partitions (startPartition to 
endPartition-1, inclusive) to
+   * read from mapId.
+   * Called on executors by reduce tasks.
+   */
+  def getMapReader[K, C](
+  handle: ShuffleHandle,
+  startPartition: Int,
+  endPartition: Int,
 
 Review comment:
   now the local shuffle reader always read all the shuffle blocks of one 
mapper. Let's add the above 2 parameters later when we going to use them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333940553
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
 ##
 @@ -54,6 +54,19 @@ private[spark] trait ShuffleManager {
   context: TaskContext,
   metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
 
+  /**
+   * Get a reader for a range of reduce partitions (startPartition to 
endPartition-1, inclusive) to
+   * read from mapId.
+   * Called on executors by reduce tasks.
+   */
+  def getMapReader[K, C](
+  handle: ShuffleHandle,
+  startPartition: Int,
+  endPartition: Int,
 
 Review comment:
   now the local shuffle reader always read all the shuffle blocks of one 
mapper. Let's add the above 2 parameters later when we going to use them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333940060
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
 ##
 @@ -2137,6 +2145,8 @@ class SQLConf extends Serializable with Logging {
   def maxNumPostShufflePartitions: Int =
 
getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions)
 
+  def optimizedLocalShuffleReaderEnabled: Boolean = 
getConf(OPTIMIZED_LOCAL_SHUFFLE_READER_ENABLED)
 
 Review comment:
   It's only called once, we don't need to create a method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333939935
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
 ##
 @@ -388,6 +388,14 @@ object SQLConf {
 "must be a positive integer.")
   .createOptional
 
+  val OPTIMIZED_LOCAL_SHUFFLE_READER_ENABLED =
 
 Review comment:
   `OPTIMIZED` -> `OPTIMIZE`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333935255
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala
 ##
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleReadMetricsReporter}
+
+
+/**
+ * The [[Partition]] used by [[LocalShuffledRowRDD]]. A pre-shuffle partition
+ * (identified by `preShufflePartitionIndex`) contains a range of post-shuffle 
partitions
+ * (`startPostShufflePartitionIndex` to `endPostShufflePartitionIndex - 1`, 
inclusive).
+ */
+private final class LocalShuffleRowRDDPartition(
+val preShufflePartitionIndex: Int,
+val startPostShufflePartitionIndex: Int,
 
 Review comment:
   this is not used any where.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333934898
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala
 ##
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleReadMetricsReporter}
+
+
+/**
+ * The [[Partition]] used by [[LocalShuffledRowRDD]]. A pre-shuffle partition
+ * (identified by `preShufflePartitionIndex`) contains a range of post-shuffle 
partitions
+ * (`startPostShufflePartitionIndex` to `endPostShufflePartitionIndex - 1`, 
inclusive).
+ */
+private final class LocalShuffleRowRDDPartition(
 
 Review comment:
   `ShuffleRow` -> `ShuffledRow`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333934690
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala
 ##
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleReadMetricsReporter}
+
+
+/**
+ * The [[Partition]] used by [[LocalShuffledRowRDD]]. A pre-shuffle partition
+ * (identified by `preShufflePartitionIndex`) contains a range of post-shuffle 
partitions
+ * (`startPostShufflePartitionIndex` to `endPostShufflePartitionIndex - 1`, 
inclusive).
+ */
+private final class LocalShuffleRowRDDPartition(
+val preShufflePartitionIndex: Int,
+val startPostShufflePartitionIndex: Int,
+val endPostShufflePartitionIndex: Int) extends Partition {
+  override val index: Int = preShufflePartitionIndex
+}
+
+/**
+ * This is a specialized version of 
[[org.apache.spark.sql.execution.ShuffledRowRDD]]. This is used
+ * in Spark SQL adaptive execution when a shuffle join is converted to 
broadcast join at runtime
+ * because the map output of one input table is small enough for broadcast. 
This RDD represents the
+ * data of another input table of the join that reads from shuffle. Each 
partition of the RDD reads
+ * the whole data from just one mapper output locally. So actually there is no 
data transferred
+ * from the network.
+
+ * This RDD takes a [[ShuffleDependency]] (`dependency`).
+ *
+ * The `dependency` has the parent RDD of this RDD, which represents the 
dataset before shuffle
+ * (i.e. map output). Elements of this RDD are (partitionId, Row) pairs.
+ * Partition ids should be in the range [0, numPartitions - 1].
+ * `dependency.partitioner.numPartitions` is the number of pre-shuffle 
partitions. (i.e. the number
+ * of partitions of the map output). The post-shuffle partition number is the 
same to the parent
+ * RDD's partition number.
+ */
+class LocalShuffledRowRDD(
+ var dependency: ShuffleDependency[Int, InternalRow, InternalRow],
+ metrics: Map[String, SQLMetric])
+  extends RDD[InternalRow](dependency.rdd.context, Nil) {
+
+  private[this] val numReducers = dependency.partitioner.numPartitions
+  private[this] val numMappers = dependency.rdd.partitions.length
+
+  override def getDependencies: Seq[Dependency[_]] = List(dependency)
+
+  override def getPartitions: Array[Partition] = {
+
+Array.tabulate[Partition](numMappers) { i =>
+  new LocalShuffleRowRDDPartition(i, 0, numReducers)
+}
+  }
+
+  override def getPreferredLocations(partition: Partition): Seq[String] = {
+val tracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
 
 Review comment:
   why not just call `dependency`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333932293
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 ##
 @@ -125,6 +130,10 @@ class AdaptiveQueryExecSuite
   assert(smj.size == 1)
   val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
   assert(bhj.size == 1)
+  val localReaders = adaptivePlan.collect {
+case reader: LocalShuffleReaderExec => reader
+  }
+  assert(localReaders.length === 1)
 
 Review comment:
   we can add a method
   ```
   def checkNumLocalShuffleReaders(plan: SparkPlan, expected: Int): Unit = ...
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333927414
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 ##
 @@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, 
BuildRight}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildRight &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) ||
+  (join.buildSide == BuildRight && 
join.left.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildLeft &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) ||
+  (join.buildSide == BuildLeft && 
join.right.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = {
+plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+join.copy(right = 
join.right.asInstanceOf[LocalShuffleReaderExec].child)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child)
+}
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.optimizedLocalShuffleReaderEnabled) {
+  return plan
+}
+
+val optimizedPlan = plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+val localReader = 
LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec])
+join.copy(right = localReader)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+val localReader = 
LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec])
+join.copy(left = localReader)
+}
+
+val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan)
+
+val numExchanges = afterEnsureRequirements.collect {
+  case e: ShuffleExchangeExec => e
+}.length
+
+if (numExchanges > 0) {
+  logWarning("OptimizeLocalShuffleReader rule is not applied due" +
+" to additional shuffles will be introduced.")
+  revertLocalShuffleReader(optimizedPlan)
+} else {
+  optimizedPlan
+}
+  }
+}
+
+case class LocalShuffleReaderExec(
+child: QueryStageExec) extends LeafExecNode {
 
 Review comment:
   nit: we can put it in the previous line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333926473
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 ##
 @@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, 
BuildRight}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildRight &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) ||
+  (join.buildSide == BuildRight && 
join.left.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildLeft &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) ||
+  (join.buildSide == BuildLeft && 
join.right.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = {
+plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+join.copy(right = 
join.right.asInstanceOf[LocalShuffleReaderExec].child)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child)
+}
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.optimizedLocalShuffleReaderEnabled) {
+  return plan
+}
+
+val optimizedPlan = plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+val localReader = 
LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec])
+join.copy(right = localReader)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+val localReader = 
LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec])
+join.copy(left = localReader)
+}
+
+val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan)
+
+val numExchanges = afterEnsureRequirements.collect {
+  case e: ShuffleExchangeExec => e
+}.length
+
+if (numExchanges > 0) {
+  logWarning("OptimizeLocalShuffleReader rule is not applied due" +
+" to additional shuffles will be introduced.")
+  revertLocalShuffleReader(optimizedPlan)
+} else {
+  optimizedPlan
+}
+  }
+}
+
+case class LocalShuffleReaderExec(
+child: QueryStageExec) extends LeafExecNode {
+
+  override def output: Seq[Attribute] = child.output
+
+  override def doCanonicalize(): SparkPlan = child.canonicalized
+
+  override def outputPartitioning: Partitioning = {
+
+def canUseChildPartitioning(stage: ShuffleQueryStageExec): Partitioning = {
 
 Review comment:
   The method name sounds like it returns a boolean. How about 
`tryReserveChildPartitioning`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additiona

[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333926303
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 ##
 @@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, 
BuildRight}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildRight &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) ||
+  (join.buildSide == BuildRight && 
join.left.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildLeft &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) ||
+  (join.buildSide == BuildLeft && 
join.right.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = {
+plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+join.copy(right = 
join.right.asInstanceOf[LocalShuffleReaderExec].child)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child)
+}
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.optimizedLocalShuffleReaderEnabled) {
+  return plan
+}
+
+val optimizedPlan = plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+val localReader = 
LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec])
+join.copy(right = localReader)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+val localReader = 
LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec])
+join.copy(left = localReader)
+}
+
+val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan)
+
+val numExchanges = afterEnsureRequirements.collect {
+  case e: ShuffleExchangeExec => e
+}.length
+
+if (numExchanges > 0) {
+  logWarning("OptimizeLocalShuffleReader rule is not applied due" +
+" to additional shuffles will be introduced.")
+  revertLocalShuffleReader(optimizedPlan)
+} else {
+  optimizedPlan
+}
+  }
+}
+
+case class LocalShuffleReaderExec(
+child: QueryStageExec) extends LeafExecNode {
+
+  override def output: Seq[Attribute] = child.output
+
+  override def doCanonicalize(): SparkPlan = child.canonicalized
+
+  override def outputPartitioning: Partitioning = {
+
+def canUseChildPartitioning(stage: ShuffleQueryStageExec): Partitioning = {
+  val initialPartitioning = stage.plan.asInstanceOf[ShuffleExchangeExec]
 
 Review comment:
   `.asInstanceOf[ShuffleExchangeExec]` is not necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-u

[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333901952
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 ##
 @@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, 
BuildRight}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildRight &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) ||
+  (join.buildSide == BuildRight && 
join.left.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildLeft &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) ||
+  (join.buildSide == BuildLeft && 
join.right.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = {
+plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+join.copy(right = 
join.right.asInstanceOf[LocalShuffleReaderExec].child)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child)
+}
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.optimizedLocalShuffleReaderEnabled) {
+  return plan
+}
+
+val optimizedPlan = plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+val localReader = 
LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec])
+join.copy(right = localReader)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+val localReader = 
LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec])
+join.copy(left = localReader)
+}
+
+val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan)
+
+val numExchanges = afterEnsureRequirements.collect {
+  case e: ShuffleExchangeExec => e
+}.length
+
+if (numExchanges > 0) {
 
 Review comment:
   It's very likely that we have exchanges in the query plan, what we need to 
guarantee here is we don't introduce extra exchanges. We should do:
   ```
   def numExchanges...
   val numExchangesBefore = numExchanges(EnsureRequirements(conf).apply(plan))
   val numExchangesAfter = 
numExchanges(EnsureRequirements(conf).apply(optimizedPlan))
   if (numExchangesAfter > numExchangesBefore) {
 plan
   } else {
 optimizedPlan
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333900101
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 ##
 @@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, 
BuildRight}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildRight &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) ||
+  (join.buildSide == BuildRight && 
join.left.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildLeft &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) ||
+  (join.buildSide == BuildLeft && 
join.right.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = {
+plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+join.copy(right = 
join.right.asInstanceOf[LocalShuffleReaderExec].child)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child)
+}
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.optimizedLocalShuffleReaderEnabled) {
+  return plan
+}
+
+val optimizedPlan = plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+val localReader = 
LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec])
+join.copy(right = localReader)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+val localReader = 
LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec])
+join.copy(left = localReader)
+}
+
+val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan)
+
+val numExchanges = afterEnsureRequirements.collect {
+  case e: ShuffleExchangeExec => e
+}.length
+
+if (numExchanges > 0) {
+  logWarning("OptimizeLocalShuffleReader rule is not applied due" +
+" to additional shuffles will be introduced.")
+  revertLocalShuffleReader(optimizedPlan)
 
 Review comment:
   We don't need to revert, just return the original plan `plan`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-08 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r332436377
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 ##
 @@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, 
BuildRight}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  def canUseLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = {
+join.buildSide == BuildLeft &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)
+  }
+
+  def canUseLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = {
+join.buildSide == BuildRight &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.optimizedLocalShuffleReaderEnabled) {
+  return plan
+}
+
+plan.transformDown{
+  case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) =>
+val localReader = 
LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec])
+join.copy(right = localReader)
+  case join: BroadcastHashJoinExec if canUseLocalShuffleReaderRight(join) 
=>
+val localReader = 
LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec])
+join.copy(left = localReader)
+}
+  }
 
 Review comment:
   I spend more time to think about the cost framework, and I think we can't 
fully rely on it right now.
   
   For example, if there is a `rule1` doing something good, and a `rule2` doing 
something negative, the current cost framework in AQE simply discard the entire 
re-optimization and neither `rule1` or `rule2` takes effect. What we expect to 
see is to have `rule1` take effect while `rule2` does not.
   
   I think each rule should try its best to make sure itself is always 
positive. For this particular rule, we can run `EnsureRequirements` on the 
original plan and the updated plan, and only keep the updated plan if number of 
shuffles doesn't increase.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-08 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r332434571
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -74,7 +74,7 @@ case class AdaptiveSparkPlanExec(
   @transient private val optimizer = new RuleExecutor[LogicalPlan] {
 // TODO add more optimization rules
 override protected def batches: Seq[Batch] = Seq(
-  Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf))
+  // Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf))
 
 Review comment:
   why disable it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-08 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r332433471
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala
 ##
 @@ -190,7 +190,7 @@ case class CoalescedShuffleReaderExec(
 UnknownPartitioning(partitionStartIndices.length)
   }
 
-  private var cachedShuffleRDD: ShuffledRowRDD = null
+  private var cachedShuffleRDD: RDD[InternalRow] = null
 
 Review comment:
   why change this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-08 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r332433103
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 ##
 @@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, 
BuildRight}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  def canUseLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = {
+join.buildSide == BuildLeft &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)
+  }
+
+  def canUseLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = {
+join.buildSide == BuildRight &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.optimizedLocalShuffleReaderEnabled) {
+  return plan
+}
+
+plan.transformDown{
+  case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) =>
+val localReader = 
LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec])
+join.copy(right = localReader)
+  case join: BroadcastHashJoinExec if canUseLocalShuffleReaderRight(join) 
=>
+val localReader = 
LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec])
+join.copy(left = localReader)
+}
+  }
+}
+
+case class LocalShuffleReaderExec(
+child: QueryStageExec) extends LeafExecNode {
+
+  override def output: Seq[Attribute] = child.output
+
+  override def doCanonicalize(): SparkPlan = child.canonicalized
+
+  override def outputPartitioning: Partitioning = {
+val numPartitions = child match {
+  case stage: ShuffleQueryStageExec =>
+stage.plan.shuffleDependency.rdd.partitions.length
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) =>
+stage.plan.shuffleDependency.rdd.partitions.length
+}
+UnknownPartitioning(numPartitions)
+  }
+
+  private var cachedShuffleRDD: RDD[InternalRow] = null
+
+  override protected def doExecute(): RDD[InternalRow] = {
+if (cachedShuffleRDD == null) {
+  cachedShuffleRDD = child match {
+case stage: ShuffleQueryStageExec =>
+  stage.plan.createLocalShuffleRDD()
+case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) =>
+  stage.plan.createLocalShuffleRDD()
+  }
+}
+cachedShuffleRDD
+  }
+
+  override def generateTreeString(
+depth: Int,
 
 Review comment:
   nit: 4 space indentation for method parameters.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-08 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r332432970
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 ##
 @@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, 
BuildRight}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  def canUseLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = {
+join.buildSide == BuildLeft &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)
+  }
+
+  def canUseLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = {
+join.buildSide == BuildRight &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.optimizedLocalShuffleReaderEnabled) {
+  return plan
+}
+
+plan.transformDown{
+  case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) =>
+val localReader = 
LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec])
+join.copy(right = localReader)
+  case join: BroadcastHashJoinExec if canUseLocalShuffleReaderRight(join) 
=>
+val localReader = 
LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec])
+join.copy(left = localReader)
+}
+  }
+}
+
+case class LocalShuffleReaderExec(
+child: QueryStageExec) extends LeafExecNode {
+
+  override def output: Seq[Attribute] = child.output
+
+  override def doCanonicalize(): SparkPlan = child.canonicalized
+
+  override def outputPartitioning: Partitioning = {
 
 Review comment:
   For example, `LocalShuffleReaderExec(ShuffleQueryStage(scan))`, shouldn't we 
report `outputPartitioning` as `scan.outputPartitioning`? Each task of the 
`scan` operator writes data to one mapper, so the output partitioning of local 
shuffle reader should be the same as the operator before the shuffle.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-08 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r332404035
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 ##
 @@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, 
BuildRight}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  def canUseLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = {
+join.buildSide == BuildLeft &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)
+  }
+
+  def canUseLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = {
+join.buildSide == BuildRight &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.optimizedLocalShuffleReaderEnabled) {
+  return plan
+}
+
+plan.transformDown{
+  case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) =>
+val localReader = 
LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec])
 
 Review comment:
   We can't just call `.asInstanceOf[QueryStageExec]` here. 
`ShuffleQueryStageExec.isShuffleQueryStageExec` returns true for 
`ReusedQueryStageExec` as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-08 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r332404035
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 ##
 @@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, 
BuildRight}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  def canUseLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = {
+join.buildSide == BuildLeft &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)
+  }
+
+  def canUseLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = {
+join.buildSide == BuildRight &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.optimizedLocalShuffleReaderEnabled) {
+  return plan
+}
+
+plan.transformDown{
+  case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) =>
+val localReader = 
LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec])
 
 Review comment:
   We can't just call `.asInstanceOf[QueryStageExec]` here. 
`ShuffleQueryStageExec.isShuffleQueryStageExec` returns true for 
`ReusedQueryStageExec` as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-08 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r332402521
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 ##
 @@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, 
BuildRight}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  def canUseLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = {
+join.buildSide == BuildLeft &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)
+  }
+
+  def canUseLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = {
+join.buildSide == BuildRight &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.optimizedLocalShuffleReaderEnabled) {
+  return plan
+}
+
+plan.transformDown{
+  case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) =>
 
 Review comment:
   shall we call it `canUseLocalShuffleReaderRight`? Here we wrap the right 
child with `LocalShuffleReaderExec`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-08 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r332401684
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark._
+import org.apache.spark.rdd.{RDD, ShuffledRDDPartition}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleReadMetricsReporter}
+
+/**
+ * This is a specialized version of 
[[org.apache.spark.sql.execution.ShuffledRowRDD]]. This is used
+ * in Spark SQL adaptive execution when a shuffle join is converted to 
broadcast join at runtime
+ * because the map output of one input table is small enough for broadcast. 
This RDD represents the
+ * data of another input table of the join that reads from shuffle. Each 
partition of the RDD reads
+ * the whole data from just one mapper output locally. So actually there is no 
data transferred
+ * from the network.
+
+ * This RDD takes a [[ShuffleDependency]] (`dependency`).
+ *
+ * The `dependency` has the parent RDD of this RDD, which represents the 
dataset before shuffle
+ * (i.e. map output). Elements of this RDD are (partitionId, Row) pairs.
+ * Partition ids should be in the range [0, numPartitions - 1].
+ * `dependency.partitioner.numPartitions` is the number of pre-shuffle 
partitions. (i.e. the number
+ * of partitions of the map output). The post-shuffle partition number is the 
same to the parent
+ * RDD's partition number.
+ */
+class LocalShuffledRowRDD(
+ var dependency: ShuffleDependency[Int, InternalRow, InternalRow],
+ metrics: Map[String, SQLMetric],
+ specifiedPartitionStartIndices: Option[Array[Int]] = None,
+ specifiedPartitionEndIndices: Option[Array[Int]] = None)
+  extends RDD[InternalRow](dependency.rdd.context, Nil) {
+
+  private[this] val numPreShufflePartitions = 
dependency.partitioner.numPartitions
+  private[this] val numPostShufflePartitions = dependency.rdd.partitions.length
 
 Review comment:
   hmm, which "shuffle" we are talking about here? If number of mappers is the 
post shuffle partitions, then it means these mappers are also reducers and 
there is another shuffle upstream.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-20 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r326512378
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private def setIsLocalToFalse(shuffleStage: QueryStageExec): QueryStageExec 
= {
+shuffleStage match {
+  case stage: ShuffleQueryStageExec =>
+stage.isLocalShuffle = false
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) =>
+stage.isLocalShuffle = false
+}
+shuffleStage
+  }
+
+  private def revertLocalShuffleReader(newPlan: SparkPlan): SparkPlan = {
+val revertPlan = newPlan.transformUp {
+  case localReader: LocalShuffleReaderExec
+if (ShuffleQueryStageExec.isShuffleQueryStageExec(localReader.child)) 
=>
+setIsLocalToFalse(localReader.child)
+}
+revertPlan
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+// Collect the `BroadcastHashJoinExec` nodes and if isEmpty directly 
return.
+val bhjs = plan.collect {
+  case bhj: BroadcastHashJoinExec => bhj
+}
+
+if (!conf.optimizedLocalShuffleReaderEnabled || bhjs.isEmpty) {
+  return plan
+}
+
+// If the streamedPlan is `ShuffleQueryStageExec`, set the value of 
`isLocalShuffle` to true
+bhjs.map {
+  case bhj: BroadcastHashJoinExec =>
+bhj.children map {
+  case stage: ShuffleQueryStageExec => stage.isLocalShuffle = true
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) =>
+stage.isLocalShuffle = true
+  case plan: SparkPlan => plan
+}
+}
+
+// Add the new `LocalShuffleReaderExec` node if the value of 
`isLocalShuffle` is true
+val newPlan = plan.transformUp {
+  case stage: ShuffleQueryStageExec if (stage.isLocalShuffle) =>
+LocalShuffleReaderExec(stage)
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) if 
(stage.isLocalShuffle) =>
+LocalShuffleReaderExec(stage)
+}
+
+val afterEnsureRequirements = EnsureRequirements(conf).apply(newPlan)
+val numExchanges = afterEnsureRequirements.collect {
+  case e: ShuffleExchangeExec => e
+}.length
+if (numExchanges > 0) {
+  logWarning("Local shuffle reader optimization is not applied due" +
+" to additional shuffles will be introduced.")
+  revertLocalShuffleReader(newPlan)
+} else {
+  newPlan
+}
+  }
+}
+
+case class LocalShuffleReaderExec(
+child: QueryStageExec) extends UnaryExecNode {
+
+  override def output: Seq[Attribute] = child.output
+
+  override def doCanonicalize(): SparkPlan = child.canonicalized
+
+  override def outputPartitioning: Partitioning = {
 
 Review comment:
   sorry, I mean `child.child.outputPartitioning`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-

[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-18 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r325545171
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark._
+import org.apache.spark.rdd.{RDD, ShuffledRDDPartition}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleReadMetricsReporter}
+
+/**
+ * This is a specialized version of 
[[org.apache.spark.sql.execution.ShuffledRowRDD]]. This is used
+ * in Spark SQL adaptive execution when a shuffle join is converted to 
broadcast join at runtime
+ * because the map output of one input table is small enough for broadcast. 
This RDD represents the
+ * data of another input table of the join that reads from shuffle. Each 
partition of the RDD reads
+ * the whole data from just one mapper output locally. So actually there is no 
data transferred
+ * from the network.
+
+ * This RDD takes a [[ShuffleDependency]] (`dependency`).
+ *
+ * The `dependency` has the parent RDD of this RDD, which represents the 
dataset before shuffle
+ * (i.e. map output). Elements of this RDD are (partitionId, Row) pairs.
+ * Partition ids should be in the range [0, numPartitions - 1].
+ * `dependency.partitioner.numPartitions` is the number of pre-shuffle 
partitions. (i.e. the number
+ * of partitions of the map output). The post-shuffle partition number is the 
same to the parent
+ * RDD's partition number.
+ */
+class LocalShuffledRowRDD(
+ var dependency: ShuffleDependency[Int, InternalRow, InternalRow],
+ metrics: Map[String, SQLMetric],
+ specifiedPartitionStartIndices: Option[Array[Int]] = None,
+ specifiedPartitionEndIndices: Option[Array[Int]] = None)
+  extends RDD[InternalRow](dependency.rdd.context, Nil) {
+
+  private[this] val numPreShufflePartitions = 
dependency.partitioner.numPartitions
+  private[this] val numPostShufflePartitions = dependency.rdd.partitions.length
+
+  private[this] val partitionStartIndices: Array[Int] = 
specifiedPartitionStartIndices match {
+case Some(indices) => indices
+case None => Array(0)
+  }
+
+  private[this] val partitionEndIndices: Array[Int] = 
specifiedPartitionEndIndices match {
+case Some(indices) => indices
+case None if specifiedPartitionStartIndices.isEmpty => 
Array(numPreShufflePartitions)
+case _ => specifiedPartitionStartIndices.get.drop(1) :+ 
numPreShufflePartitions
+  }
+
+  override def getDependencies: Seq[Dependency[_]] = List(dependency)
+
+  override def getPartitions: Array[Partition] = {
+assert(partitionStartIndices.length == partitionEndIndices.length)
+Array.tabulate[Partition](numPostShufflePartitions) { i =>
+  new ShuffledRDDPartition(i)
+}
+  }
+
+  override def getPreferredLocations(partition: Partition): Seq[String] = {
+val tracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
+tracker.getMapLocation(dep, partition.index, partition.index + 1)
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+val shuffledRowPartition = split.asInstanceOf[ShuffledRDDPartition]
+val mapId = shuffledRowPartition.index
+val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
+// `SQLShuffleReadMetricsReporter` will update its own metrics for SQL 
exchange operator,
+// as well as the `tempMetrics` for basic shuffle metrics.
+val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, 
metrics)
+// Connect the the InternalRows read by each ShuffleReader
+new Iterator[InternalRow] {
+  val readers = partitionStartIndices.zip(partitionEndIndices).map { case 
(start, end) =>
 
 Review comment:
   I get your point that some shuffle blocks are empty and we should skip them, 
but I think this 

[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-18 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r325542826
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
 ##
 @@ -629,6 +645,35 @@ private[spark] class MapOutputTrackerMaster(
 None
   }
 
+  /**
+   * Return the locations where the Mapper(s) ran. The locations each includes 
both a host and an
+   * executor id on that host.
+   *
+   * @param dep shuffle dependency object
+   * @param startMapId the start map id
+   * @param endMapId the end map id
+   * @return a sequence of locations that each includes both a host and an 
executor id on that
 
 Review comment:
   `includes both a host and an executor id` is confusing. We can just say 
`task location strinng (please refer to TaskLocation)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-18 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r325540713
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
 ##
 @@ -629,6 +645,35 @@ private[spark] class MapOutputTrackerMaster(
 None
   }
 
+  /**
+   * Return the locations where the Mapper(s) ran. The locations each includes 
both a host and an
+   * executor id on that host.
+   *
+   * @param dep shuffle dependency object
+   * @param startMapId the start map id
+   * @param endMapId the end map id
+   * @return a sequence of locations that each includes both a host and an 
executor id on that
 
 Review comment:
   Why not return `ExecutorCacheTaskLocation`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-18 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r325540713
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
 ##
 @@ -629,6 +645,35 @@ private[spark] class MapOutputTrackerMaster(
 None
   }
 
+  /**
+   * Return the locations where the Mapper(s) ran. The locations each includes 
both a host and an
+   * executor id on that host.
+   *
+   * @param dep shuffle dependency object
+   * @param startMapId the start map id
+   * @param endMapId the end map id
+   * @return a sequence of locations that each includes both a host and an 
executor id on that
 
 Review comment:
   Why not return `ExecutorCacheTaskLocation`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-18 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r325539965
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark._
+import org.apache.spark.rdd.{RDD, ShuffledRDDPartition}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleReadMetricsReporter}
+
+/**
+ * This is a specialized version of 
[[org.apache.spark.sql.execution.ShuffledRowRDD]]. This is used
+ * in Spark SQL adaptive execution when a shuffle join is converted to 
broadcast join at runtime
+ * because the map output of one input table is small enough for broadcast. 
This RDD represents the
+ * data of another input table of the join that reads from shuffle. Each 
partition of the RDD reads
+ * the whole data from just one mapper output locally. So actually there is no 
data transferred
+ * from the network.
+
+ * This RDD takes a [[ShuffleDependency]] (`dependency`).
+ *
+ * The `dependency` has the parent RDD of this RDD, which represents the 
dataset before shuffle
+ * (i.e. map output). Elements of this RDD are (partitionId, Row) pairs.
+ * Partition ids should be in the range [0, numPartitions - 1].
+ * `dependency.partitioner.numPartitions` is the number of pre-shuffle 
partitions. (i.e. the number
+ * of partitions of the map output). The post-shuffle partition number is the 
same to the parent
+ * RDD's partition number.
+ */
+class LocalShuffledRowRDD(
+ var dependency: ShuffleDependency[Int, InternalRow, InternalRow],
+ metrics: Map[String, SQLMetric],
+ specifiedPartitionStartIndices: Option[Array[Int]] = None,
+ specifiedPartitionEndIndices: Option[Array[Int]] = None)
+  extends RDD[InternalRow](dependency.rdd.context, Nil) {
+
+  private[this] val numPreShufflePartitions = 
dependency.partitioner.numPartitions
+  private[this] val numPostShufflePartitions = dependency.rdd.partitions.length
 
 Review comment:
   The name is wrong. This is the # of mappers and thus should be called 
`numPreShufflePartitions`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-18 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r325538188
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark._
+import org.apache.spark.rdd.{RDD, ShuffledRDDPartition}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleReadMetricsReporter}
+
+/**
+ * This is a specialized version of 
[[org.apache.spark.sql.execution.ShuffledRowRDD]]. This is used
+ * in Spark SQL adaptive execution when a shuffle join is converted to 
broadcast join at runtime
+ * because the map output of one input table is small enough for broadcast. 
This RDD represents the
+ * data of another input table of the join that reads from shuffle. Each 
partition of the RDD reads
+ * the whole data from just one mapper output locally. So actually there is no 
data transferred
+ * from the network.
+
+ * This RDD takes a [[ShuffleDependency]] (`dependency`).
+ *
+ * The `dependency` has the parent RDD of this RDD, which represents the 
dataset before shuffle
+ * (i.e. map output). Elements of this RDD are (partitionId, Row) pairs.
+ * Partition ids should be in the range [0, numPartitions - 1].
+ * `dependency.partitioner.numPartitions` is the number of pre-shuffle 
partitions. (i.e. the number
+ * of partitions of the map output). The post-shuffle partition number is the 
same to the parent
+ * RDD's partition number.
+ */
+class LocalShuffledRowRDD(
+ var dependency: ShuffleDependency[Int, InternalRow, InternalRow],
+ metrics: Map[String, SQLMetric],
+ specifiedPartitionStartIndices: Option[Array[Int]] = None,
+ specifiedPartitionEndIndices: Option[Array[Int]] = None)
 
 Review comment:
   then let's add it when you propose this optimization.
   
   From my side, I think it may be beneficial to keep empty tasks, so that the 
local shuffle reader node can retain the output partitioning from the original 
plan and help us eliminate shuffles.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-18 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r325536699
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private def setIsLocalToFalse(shuffleStage: QueryStageExec): QueryStageExec 
= {
+shuffleStage match {
+  case stage: ShuffleQueryStageExec =>
+stage.isLocalShuffle = false
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) =>
+stage.isLocalShuffle = false
+}
+shuffleStage
+  }
+
+  private def revertLocalShuffleReader(newPlan: SparkPlan): SparkPlan = {
+val revertPlan = newPlan.transformUp {
+  case localReader: LocalShuffleReaderExec
+if (ShuffleQueryStageExec.isShuffleQueryStageExec(localReader.child)) 
=>
+setIsLocalToFalse(localReader.child)
+}
+revertPlan
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+// Collect the `BroadcastHashJoinExec` nodes and if isEmpty directly 
return.
+val bhjs = plan.collect {
+  case bhj: BroadcastHashJoinExec => bhj
+}
+
+if (!conf.optimizedLocalShuffleReaderEnabled || bhjs.isEmpty) {
+  return plan
+}
+
+// If the streamedPlan is `ShuffleQueryStageExec`, set the value of 
`isLocalShuffle` to true
+bhjs.map {
+  case bhj: BroadcastHashJoinExec =>
+bhj.children map {
+  case stage: ShuffleQueryStageExec => stage.isLocalShuffle = true
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) =>
+stage.isLocalShuffle = true
+  case plan: SparkPlan => plan
+}
+}
+
+// Add the new `LocalShuffleReaderExec` node if the value of 
`isLocalShuffle` is true
+val newPlan = plan.transformUp {
+  case stage: ShuffleQueryStageExec if (stage.isLocalShuffle) =>
+LocalShuffleReaderExec(stage)
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) if 
(stage.isLocalShuffle) =>
+LocalShuffleReaderExec(stage)
+}
+
+val afterEnsureRequirements = EnsureRequirements(conf).apply(newPlan)
+val numExchanges = afterEnsureRequirements.collect {
+  case e: ShuffleExchangeExec => e
+}.length
+if (numExchanges > 0) {
+  logWarning("Local shuffle reader optimization is not applied due" +
+" to additional shuffles will be introduced.")
+  revertLocalShuffleReader(newPlan)
+} else {
+  newPlan
+}
+  }
+}
+
+case class LocalShuffleReaderExec(
+child: QueryStageExec) extends UnaryExecNode {
 
 Review comment:
   We can make it a leaf node to hide its `QueryStageExec`. We don't expect any 
other rules to change the underlying shuffle stage.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-18 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r325536116
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private def setIsLocalToFalse(shuffleStage: QueryStageExec): QueryStageExec 
= {
+shuffleStage match {
+  case stage: ShuffleQueryStageExec =>
+stage.isLocalShuffle = false
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) =>
+stage.isLocalShuffle = false
+}
+shuffleStage
+  }
+
+  private def revertLocalShuffleReader(newPlan: SparkPlan): SparkPlan = {
+val revertPlan = newPlan.transformUp {
+  case localReader: LocalShuffleReaderExec
+if (ShuffleQueryStageExec.isShuffleQueryStageExec(localReader.child)) 
=>
+setIsLocalToFalse(localReader.child)
+}
+revertPlan
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+// Collect the `BroadcastHashJoinExec` nodes and if isEmpty directly 
return.
+val bhjs = plan.collect {
+  case bhj: BroadcastHashJoinExec => bhj
+}
+
+if (!conf.optimizedLocalShuffleReaderEnabled || bhjs.isEmpty) {
+  return plan
+}
+
+// If the streamedPlan is `ShuffleQueryStageExec`, set the value of 
`isLocalShuffle` to true
+bhjs.map {
+  case bhj: BroadcastHashJoinExec =>
+bhj.children map {
+  case stage: ShuffleQueryStageExec => stage.isLocalShuffle = true
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) =>
+stage.isLocalShuffle = true
+  case plan: SparkPlan => plan
+}
+}
+
+// Add the new `LocalShuffleReaderExec` node if the value of 
`isLocalShuffle` is true
+val newPlan = plan.transformUp {
+  case stage: ShuffleQueryStageExec if (stage.isLocalShuffle) =>
+LocalShuffleReaderExec(stage)
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) if 
(stage.isLocalShuffle) =>
+LocalShuffleReaderExec(stage)
+}
+
+val afterEnsureRequirements = EnsureRequirements(conf).apply(newPlan)
+val numExchanges = afterEnsureRequirements.collect {
+  case e: ShuffleExchangeExec => e
+}.length
+if (numExchanges > 0) {
+  logWarning("Local shuffle reader optimization is not applied due" +
+" to additional shuffles will be introduced.")
+  revertLocalShuffleReader(newPlan)
+} else {
+  newPlan
+}
+  }
+}
+
+case class LocalShuffleReaderExec(
+child: QueryStageExec) extends UnaryExecNode {
+
+  override def output: Seq[Attribute] = child.output
+
+  override def doCanonicalize(): SparkPlan = child.canonicalized
+
+  override def outputPartitioning: Partitioning = {
 
 Review comment:
   shouldn't this be `child.outputPartitioning`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-m

[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-18 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r325533208
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -91,6 +91,7 @@ case class AdaptiveSparkPlanExec(
   // optimizations should be stage-independent.
   @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
 ReuseAdaptiveSubquery(conf, subqueryCache),
+OptimizedLocalShuffleReader(conf),
 
 Review comment:
   since this may change the number of exchanges, we should put it in 
`queryStagePreparationRules`
   Then the AQE framework can check the cost and give up the optimization if 
extra changes are introduced.
   
   Note that, making it a physical rule and check number of exchanges is 
suboptimal. It's possible that the local shuffle reader can avoid exchanges 
downstream, which changes the stage boundaries.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-18 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r325533208
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -91,6 +91,7 @@ case class AdaptiveSparkPlanExec(
   // optimizations should be stage-independent.
   @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
 ReuseAdaptiveSubquery(conf, subqueryCache),
+OptimizedLocalShuffleReader(conf),
 
 Review comment:
   since this may change the number of exchanges, we should put it in 
`queryStagePreparationRules`
   Then the AQE framework can check the cost and give up the optimization if 
extra changes are introduced.
   
   Note that, the current approach (check number of exchanges at the end of 
rule) is suboptimal. It's possible that the local shuffle reader can avoid 
exchanges downstream, which changes the stage boundaries.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-18 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r325533208
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -91,6 +91,7 @@ case class AdaptiveSparkPlanExec(
   // optimizations should be stage-independent.
   @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
 ReuseAdaptiveSubquery(conf, subqueryCache),
+OptimizedLocalShuffleReader(conf),
 
 Review comment:
   since this may change the number of exchanges, we should put it in 
https://github.com/apache/spark/pull/25295/files#diff-6954dd8020a9ca298f1fb9602c0e831cR77
   
   Then the AQE framework can check the cost and give up the optimization if 
extra changes are introduced.
   
   Note that, making it a physical rule and check number of exchanges is 
suboptimal. It's possible that the local shuffle reader can avoid exchanges 
downstream, which changes the stage boundaries.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-18 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r325530214
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private def setIsLocalToFalse(shuffleStage: QueryStageExec): QueryStageExec 
= {
+shuffleStage match {
+  case stage: ShuffleQueryStageExec =>
+stage.isLocalShuffle = false
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) =>
+stage.isLocalShuffle = false
+}
+shuffleStage
+  }
+
+  private def revertLocalShuffleReader(newPlan: SparkPlan): SparkPlan = {
+val revertPlan = newPlan.transformUp {
+  case localReader: LocalShuffleReaderExec
+if (ShuffleQueryStageExec.isShuffleQueryStageExec(localReader.child)) 
=>
+setIsLocalToFalse(localReader.child)
+}
+revertPlan
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+// Collect the `BroadcastHashJoinExec` nodes and if isEmpty directly 
return.
+val bhjs = plan.collect {
+  case bhj: BroadcastHashJoinExec => bhj
+}
+
+if (!conf.optimizedLocalShuffleReaderEnabled || bhjs.isEmpty) {
+  return plan
+}
+
+// If the streamedPlan is `ShuffleQueryStageExec`, set the value of 
`isLocalShuffle` to true
+bhjs.map {
+  case bhj: BroadcastHashJoinExec =>
+bhj.children map {
+  case stage: ShuffleQueryStageExec => stage.isLocalShuffle = true
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) =>
+stage.isLocalShuffle = true
+  case plan: SparkPlan => plan
+}
+}
+
+// Add the new `LocalShuffleReaderExec` node if the value of 
`isLocalShuffle` is true
+val newPlan = plan.transformUp {
+  case stage: ShuffleQueryStageExec if (stage.isLocalShuffle) =>
+LocalShuffleReaderExec(stage)
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) if 
(stage.isLocalShuffle) =>
 
 Review comment:
   let's not strip the `ReusedQueryStageExec`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-18 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r325529240
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private def setIsLocalToFalse(shuffleStage: QueryStageExec): QueryStageExec 
= {
+shuffleStage match {
+  case stage: ShuffleQueryStageExec =>
+stage.isLocalShuffle = false
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) =>
+stage.isLocalShuffle = false
+}
+shuffleStage
+  }
+
+  private def revertLocalShuffleReader(newPlan: SparkPlan): SparkPlan = {
+val revertPlan = newPlan.transformUp {
+  case localReader: LocalShuffleReaderExec
+if (ShuffleQueryStageExec.isShuffleQueryStageExec(localReader.child)) 
=>
+setIsLocalToFalse(localReader.child)
+}
+revertPlan
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+// Collect the `BroadcastHashJoinExec` nodes and if isEmpty directly 
return.
+val bhjs = plan.collect {
+  case bhj: BroadcastHashJoinExec => bhj
+}
+
+if (!conf.optimizedLocalShuffleReaderEnabled || bhjs.isEmpty) {
+  return plan
+}
+
+// If the streamedPlan is `ShuffleQueryStageExec`, set the value of 
`isLocalShuffle` to true
+bhjs.map {
+  case bhj: BroadcastHashJoinExec =>
+bhj.children map {
+  case stage: ShuffleQueryStageExec => stage.isLocalShuffle = true
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) =>
+stage.isLocalShuffle = true
+  case plan: SparkPlan => plan
+}
+}
+
+// Add the new `LocalShuffleReaderExec` node if the value of 
`isLocalShuffle` is true
+val newPlan = plan.transformUp {
 
 Review comment:
   Why don't we traverse the tree once?
   ```
   def isShuffleStage(plan: SparkPlan): Boolean = plan match {
 case _: ShuffleQueryStageExec => true
 case ReusedQueryStageExec(_: ShuffleQueryStageExec) => true
 case _ => false
   }
   
   def canUseLocalShuffleReaderLeft(j: BroadcastHashJoinExec): Boolean = {
 j.buildSide = BuildLeft && isShuffleStage(j.left)
   }
   
   def canUseLocalShuffleReaderRight ...
   ...
   plan transformDown {
 case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) =>
   val localShuffleReader = ...
   join.copy(left = localShuffleReader)
   
 ...
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-18 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r325526025
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private def setIsLocalToFalse(shuffleStage: QueryStageExec): QueryStageExec 
= {
+shuffleStage match {
+  case stage: ShuffleQueryStageExec =>
+stage.isLocalShuffle = false
 
 Review comment:
   if possible let's not add mutable states to the plan


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-18 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r325525673
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
 
 Review comment:
   nit: this should be a verb, `OptimizeLocalShuffleReader `


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-12 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r323609170
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala
 ##
 @@ -180,25 +180,45 @@ case class ReduceNumShufflePartitions(conf: SQLConf) 
extends Rule[SparkPlan] {
 
 case class CoalescedShuffleReaderExec(
 child: QueryStageExec,
-partitionStartIndices: Array[Int]) extends UnaryExecNode {
+partitionStartIndices: Array[Int],
+var isLocal: Boolean = false) extends UnaryExecNode {
 
 Review comment:
   without local shuffle reader, a task of `ShuffledRDD` reads the shuffle 
blocks `map1-reduce1`, `map2-reduce1`, etc. With local shuffle reader, the task 
reads `map1-reduce1`, `map1-reduce2`, etc. The task output data size is 
different and we can't use the algorithm in `ReduceNumShufflePartitions` 
anymore.
   
   Furthermore, the RDD numPartitions also becomes different after switching to 
local shuffle reader, how can we apply the `ReduceNumShufflePartitions`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r323223769
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala
 ##
 @@ -180,25 +180,45 @@ case class ReduceNumShufflePartitions(conf: SQLConf) 
extends Rule[SparkPlan] {
 
 case class CoalescedShuffleReaderExec(
 child: QueryStageExec,
-partitionStartIndices: Array[Int]) extends UnaryExecNode {
+partitionStartIndices: Array[Int],
+var isLocal: Boolean = false) extends UnaryExecNode {
 
 Review comment:
   `ReduceNumShufflePartitions` and local shuffle reader are two different 
optimizations, and they are conflicting:
   `ReduceNumShufflePartitions` adjusts the numPartitions by assuming the 
partitions are post-shuffle partitions. Their data size depends on the shuffle 
blocks they need to read. If we change the shuffle to local shuffle reader, 
then the partitions become pre-shuffle partitions, and their data size is 
different.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-09-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r323223769
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala
 ##
 @@ -180,25 +180,45 @@ case class ReduceNumShufflePartitions(conf: SQLConf) 
extends Rule[SparkPlan] {
 
 case class CoalescedShuffleReaderExec(
 child: QueryStageExec,
-partitionStartIndices: Array[Int]) extends UnaryExecNode {
+partitionStartIndices: Array[Int],
+var isLocal: Boolean = false) extends UnaryExecNode {
 
 Review comment:
   `ReduceNumShufflePartitions` and local shuffle reader are two different 
optimizations, and they are conflicting:
   `ReduceNumShufflePartitions` adjusts the numPartitions by assuming the 
partitions are post-shuffle partitions. Their data size depends on the shuffle 
blocks they need to read. If we change the shuffle to local shuffle readers, 
then the partitions become pre-shuffle partitions, and their data size is 
different.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org