[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r334792481 ## File path: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ## @@ -36,18 +36,33 @@ private[spark] class BlockStoreShuffleReader[K, C]( readMetrics: ShuffleReadMetricsReporter, serializerManager: SerializerManager = SparkEnv.get.serializerManager, blockManager: BlockManager = SparkEnv.get.blockManager, -mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker) +mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker, +mapId: Option[Int] = None) extends ShuffleReader[K, C] with Logging { private val dep = handle.dependency /** Read the combined key-values for this reduce task */ override def read(): Iterator[Product2[K, C]] = { +val blocksByAddress = mapId match { + case (Some(mapId)) => mapOutputTracker.getMapSizesByExecutorId( +handle.shuffleId, +startPartition, +endPartition, +mapId) + case (None) => mapOutputTracker.getMapSizesByExecutorId( +handle.shuffleId, +startPartition, +endPartition) + case (_) => throw new IllegalArgumentException( Review comment: nit: ``` case Some(..) => case None => ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r334379189 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala ## @@ -221,7 +230,8 @@ class AdaptiveQueryExecSuite val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 2) val ex = findReusedExchange(adaptivePlan) - assert(ex.size == 1) + // The ReusedExchange is hidden in LocalShuffleReaderExec Review comment: We should update `findReusedExchange` and make it go through `LocalShuffleReaderExec` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r334377854 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala ## @@ -355,6 +365,28 @@ class AdaptiveQueryExecSuite } } + test("Change merge join to broadcast join and optimize the shuffle" + +" reader to local shuffle reader") { Review comment: how about "Change merge join to broadcast join without local shuffle reader" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r334369858 ## File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ## @@ -893,31 +979,59 @@ private[spark] object MapOutputTracker extends Logging { startPartition: Int, endPartition: Int, statuses: Array[MapStatus], - useOldFetchProtocol: Boolean): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { + useOldFetchProtocol: Boolean, + mapId : Option[Int] = None): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { assert (statuses != null) val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]] -for ((status, mapIndex) <- statuses.iterator.zipWithIndex) { - if (status == null) { -val errorMessage = s"Missing an output location for shuffle $shuffleId" -logError(errorMessage) -throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) - } else { -for (part <- startPartition until endPartition) { - val size = status.getSizeForBlock(part) - if (size != 0) { -if (useOldFetchProtocol) { - // While we use the old shuffle fetch protocol, we use mapIndex as mapId in the - // ShuffleBlockId. - splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += -((ShuffleBlockId(shuffleId, mapIndex, part), size, mapIndex)) -} else { - splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += -((ShuffleBlockId(shuffleId, status.mapTaskId, part), size, mapIndex)) +mapId match { + case (Some(mapId)) => +for ((status, mapIndex) <- statuses.iterator.zipWithIndex.filter(_._2 == mapId)) { Review comment: nit: ``` val iter = statuses.iterator.zipWithIndex for ((status, mapIndex) <- mapId.map(id => iter.filter(_._2 == id)).getOrElse(iter)) ``` Then we can save a lot of duplicated code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r334366021 ## File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ## @@ -669,6 +685,31 @@ private[spark] class MapOutputTrackerMaster( None } + /** + * Return the locations where the Mapper(s) ran. The locations each includes both a host and an Review comment: The doc should be updated as we only get one location for one mapper now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333946016 ## File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ## @@ -749,6 +818,26 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr } } + override def getMapSizesByExecutorId( + shuffleId: Int, + startPartition: Int, + endPartition: Int, + mapId: Int, + useOldFetchProtocol: Boolean) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { +logDebug(s"Fetching outputs for shuffle $shuffleId, mapId $mapId" + + s"partitions $startPartition-$endPartition") +val statuses = getStatuses(shuffleId) Review comment: Since we need to get all the shuffle status anyway, we can call the existing `convertMapStatuses`, and do an extra filter to only collect the blocks whose mapIndex is what we want. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333943901 ## File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ## @@ -669,6 +685,34 @@ private[spark] class MapOutputTrackerMaster( None } + /** + * Return the locations where the Mapper(s) ran. The locations each includes both a host and an + * executor id on that host. + * + * @param dep shuffle dependency object + * @param startMapId the start map id + * @param endMapId the end map id Review comment: We shouldn't add code that only for future use. For now we only need a single `mapId`, IIUC. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333943188 ## File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala ## @@ -54,6 +54,19 @@ private[spark] trait ShuffleManager { context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to + * read from mapId. + * Called on executors by reduce tasks. + */ + def getMapReader[K, C]( Review comment: instead of creating a new API, how about we set the map id in `TaskContext.getLocalProperties`, and still call the existing `getReader` method? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333941162 ## File path: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ## @@ -36,19 +36,35 @@ private[spark] class BlockStoreShuffleReader[K, C]( readMetrics: ShuffleReadMetricsReporter, serializerManager: SerializerManager = SparkEnv.get.serializerManager, blockManager: BlockManager = SparkEnv.get.blockManager, -mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker) +mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker, +mapId: Option[Int] = None) extends ShuffleReader[K, C] with Logging { private val dep = handle.dependency /** Read the combined key-values for this reduce task */ override def read(): Iterator[Product2[K, C]] = { +val blocksByAddress = (mapId) match { Review comment: `(mapId)` -> `mapId` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333940553 ## File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala ## @@ -54,6 +54,19 @@ private[spark] trait ShuffleManager { context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to + * read from mapId. + * Called on executors by reduce tasks. + */ + def getMapReader[K, C]( + handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, Review comment: now the local shuffle reader always read all the shuffle blocks of one mapper. Let's add the above 2 parameters later when we going to use them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333940553 ## File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala ## @@ -54,6 +54,19 @@ private[spark] trait ShuffleManager { context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to + * read from mapId. + * Called on executors by reduce tasks. + */ + def getMapReader[K, C]( + handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, Review comment: now the local shuffle reader always read all the shuffle blocks of one mapper. Let's add the above 2 parameters later when we going to use them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333940060 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -2137,6 +2145,8 @@ class SQLConf extends Serializable with Logging { def maxNumPostShufflePartitions: Int = getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions) + def optimizedLocalShuffleReaderEnabled: Boolean = getConf(OPTIMIZED_LOCAL_SHUFFLE_READER_ENABLED) Review comment: It's only called once, we don't need to create a method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333939935 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -388,6 +388,14 @@ object SQLConf { "must be a positive integer.") .createOptional + val OPTIMIZED_LOCAL_SHUFFLE_READER_ENABLED = Review comment: `OPTIMIZED` -> `OPTIMIZE` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333935255 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} + + +/** + * The [[Partition]] used by [[LocalShuffledRowRDD]]. A pre-shuffle partition + * (identified by `preShufflePartitionIndex`) contains a range of post-shuffle partitions + * (`startPostShufflePartitionIndex` to `endPostShufflePartitionIndex - 1`, inclusive). + */ +private final class LocalShuffleRowRDDPartition( +val preShufflePartitionIndex: Int, +val startPostShufflePartitionIndex: Int, Review comment: this is not used any where. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333934898 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} + + +/** + * The [[Partition]] used by [[LocalShuffledRowRDD]]. A pre-shuffle partition + * (identified by `preShufflePartitionIndex`) contains a range of post-shuffle partitions + * (`startPostShufflePartitionIndex` to `endPostShufflePartitionIndex - 1`, inclusive). + */ +private final class LocalShuffleRowRDDPartition( Review comment: `ShuffleRow` -> `ShuffledRow` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333934690 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} + + +/** + * The [[Partition]] used by [[LocalShuffledRowRDD]]. A pre-shuffle partition + * (identified by `preShufflePartitionIndex`) contains a range of post-shuffle partitions + * (`startPostShufflePartitionIndex` to `endPostShufflePartitionIndex - 1`, inclusive). + */ +private final class LocalShuffleRowRDDPartition( +val preShufflePartitionIndex: Int, +val startPostShufflePartitionIndex: Int, +val endPostShufflePartitionIndex: Int) extends Partition { + override val index: Int = preShufflePartitionIndex +} + +/** + * This is a specialized version of [[org.apache.spark.sql.execution.ShuffledRowRDD]]. This is used + * in Spark SQL adaptive execution when a shuffle join is converted to broadcast join at runtime + * because the map output of one input table is small enough for broadcast. This RDD represents the + * data of another input table of the join that reads from shuffle. Each partition of the RDD reads + * the whole data from just one mapper output locally. So actually there is no data transferred + * from the network. + + * This RDD takes a [[ShuffleDependency]] (`dependency`). + * + * The `dependency` has the parent RDD of this RDD, which represents the dataset before shuffle + * (i.e. map output). Elements of this RDD are (partitionId, Row) pairs. + * Partition ids should be in the range [0, numPartitions - 1]. + * `dependency.partitioner.numPartitions` is the number of pre-shuffle partitions. (i.e. the number + * of partitions of the map output). The post-shuffle partition number is the same to the parent + * RDD's partition number. + */ +class LocalShuffledRowRDD( + var dependency: ShuffleDependency[Int, InternalRow, InternalRow], + metrics: Map[String, SQLMetric]) + extends RDD[InternalRow](dependency.rdd.context, Nil) { + + private[this] val numReducers = dependency.partitioner.numPartitions + private[this] val numMappers = dependency.rdd.partitions.length + + override def getDependencies: Seq[Dependency[_]] = List(dependency) + + override def getPartitions: Array[Partition] = { + +Array.tabulate[Partition](numMappers) { i => + new LocalShuffleRowRDDPartition(i, 0, numReducers) +} + } + + override def getPreferredLocations(partition: Partition): Seq[String] = { +val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] +val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]] Review comment: why not just call `dependency`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333932293 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala ## @@ -125,6 +130,10 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) + val localReaders = adaptivePlan.collect { +case reader: LocalShuffleReaderExec => reader + } + assert(localReaders.length === 1) Review comment: we can add a method ``` def checkNumLocalShuffleReaders(plan: SparkPlan, expected: Int): Unit = ... ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333927414 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) || + (join.buildSide == BuildRight && join.left.isInstanceOf[LocalShuffleReaderExec]) + } + + def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) || + (join.buildSide == BuildLeft && join.right.isInstanceOf[LocalShuffleReaderExec]) + } + + def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = { +plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +join.copy(right = join.right.asInstanceOf[LocalShuffleReaderExec].child) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child) +} + } + + override def apply(plan: SparkPlan): SparkPlan = { +if (!conf.optimizedLocalShuffleReaderEnabled) { + return plan +} + +val optimizedPlan = plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) +join.copy(right = localReader) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec]) +join.copy(left = localReader) +} + +val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan) + +val numExchanges = afterEnsureRequirements.collect { + case e: ShuffleExchangeExec => e +}.length + +if (numExchanges > 0) { + logWarning("OptimizeLocalShuffleReader rule is not applied due" + +" to additional shuffles will be introduced.") + revertLocalShuffleReader(optimizedPlan) +} else { + optimizedPlan +} + } +} + +case class LocalShuffleReaderExec( +child: QueryStageExec) extends LeafExecNode { Review comment: nit: we can put it in the previous line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333926473 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) || + (join.buildSide == BuildRight && join.left.isInstanceOf[LocalShuffleReaderExec]) + } + + def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) || + (join.buildSide == BuildLeft && join.right.isInstanceOf[LocalShuffleReaderExec]) + } + + def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = { +plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +join.copy(right = join.right.asInstanceOf[LocalShuffleReaderExec].child) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child) +} + } + + override def apply(plan: SparkPlan): SparkPlan = { +if (!conf.optimizedLocalShuffleReaderEnabled) { + return plan +} + +val optimizedPlan = plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) +join.copy(right = localReader) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec]) +join.copy(left = localReader) +} + +val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan) + +val numExchanges = afterEnsureRequirements.collect { + case e: ShuffleExchangeExec => e +}.length + +if (numExchanges > 0) { + logWarning("OptimizeLocalShuffleReader rule is not applied due" + +" to additional shuffles will be introduced.") + revertLocalShuffleReader(optimizedPlan) +} else { + optimizedPlan +} + } +} + +case class LocalShuffleReaderExec( +child: QueryStageExec) extends LeafExecNode { + + override def output: Seq[Attribute] = child.output + + override def doCanonicalize(): SparkPlan = child.canonicalized + + override def outputPartitioning: Partitioning = { + +def canUseChildPartitioning(stage: ShuffleQueryStageExec): Partitioning = { Review comment: The method name sounds like it returns a boolean. How about `tryReserveChildPartitioning`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additiona
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333926303 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) || + (join.buildSide == BuildRight && join.left.isInstanceOf[LocalShuffleReaderExec]) + } + + def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) || + (join.buildSide == BuildLeft && join.right.isInstanceOf[LocalShuffleReaderExec]) + } + + def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = { +plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +join.copy(right = join.right.asInstanceOf[LocalShuffleReaderExec].child) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child) +} + } + + override def apply(plan: SparkPlan): SparkPlan = { +if (!conf.optimizedLocalShuffleReaderEnabled) { + return plan +} + +val optimizedPlan = plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) +join.copy(right = localReader) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec]) +join.copy(left = localReader) +} + +val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan) + +val numExchanges = afterEnsureRequirements.collect { + case e: ShuffleExchangeExec => e +}.length + +if (numExchanges > 0) { + logWarning("OptimizeLocalShuffleReader rule is not applied due" + +" to additional shuffles will be introduced.") + revertLocalShuffleReader(optimizedPlan) +} else { + optimizedPlan +} + } +} + +case class LocalShuffleReaderExec( +child: QueryStageExec) extends LeafExecNode { + + override def output: Seq[Attribute] = child.output + + override def doCanonicalize(): SparkPlan = child.canonicalized + + override def outputPartitioning: Partitioning = { + +def canUseChildPartitioning(stage: ShuffleQueryStageExec): Partitioning = { + val initialPartitioning = stage.plan.asInstanceOf[ShuffleExchangeExec] Review comment: `.asInstanceOf[ShuffleExchangeExec]` is not necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-u
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333901952 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) || + (join.buildSide == BuildRight && join.left.isInstanceOf[LocalShuffleReaderExec]) + } + + def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) || + (join.buildSide == BuildLeft && join.right.isInstanceOf[LocalShuffleReaderExec]) + } + + def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = { +plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +join.copy(right = join.right.asInstanceOf[LocalShuffleReaderExec].child) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child) +} + } + + override def apply(plan: SparkPlan): SparkPlan = { +if (!conf.optimizedLocalShuffleReaderEnabled) { + return plan +} + +val optimizedPlan = plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) +join.copy(right = localReader) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec]) +join.copy(left = localReader) +} + +val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan) + +val numExchanges = afterEnsureRequirements.collect { + case e: ShuffleExchangeExec => e +}.length + +if (numExchanges > 0) { Review comment: It's very likely that we have exchanges in the query plan, what we need to guarantee here is we don't introduce extra exchanges. We should do: ``` def numExchanges... val numExchangesBefore = numExchanges(EnsureRequirements(conf).apply(plan)) val numExchangesAfter = numExchanges(EnsureRequirements(conf).apply(optimizedPlan)) if (numExchangesAfter > numExchangesBefore) { plan } else { optimizedPlan } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333900101 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) || + (join.buildSide == BuildRight && join.left.isInstanceOf[LocalShuffleReaderExec]) + } + + def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) || + (join.buildSide == BuildLeft && join.right.isInstanceOf[LocalShuffleReaderExec]) + } + + def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = { +plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +join.copy(right = join.right.asInstanceOf[LocalShuffleReaderExec].child) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child) +} + } + + override def apply(plan: SparkPlan): SparkPlan = { +if (!conf.optimizedLocalShuffleReaderEnabled) { + return plan +} + +val optimizedPlan = plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) +join.copy(right = localReader) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec]) +join.copy(left = localReader) +} + +val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan) + +val numExchanges = afterEnsureRequirements.collect { + case e: ShuffleExchangeExec => e +}.length + +if (numExchanges > 0) { + logWarning("OptimizeLocalShuffleReader rule is not applied due" + +" to additional shuffles will be introduced.") + revertLocalShuffleReader(optimizedPlan) Review comment: We don't need to revert, just return the original plan `plan`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r332436377 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + def canUseLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { +join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right) + } + + def canUseLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { +join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left) + } + + override def apply(plan: SparkPlan): SparkPlan = { +if (!conf.optimizedLocalShuffleReaderEnabled) { + return plan +} + +plan.transformDown{ + case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) => +val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) +join.copy(right = localReader) + case join: BroadcastHashJoinExec if canUseLocalShuffleReaderRight(join) => +val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec]) +join.copy(left = localReader) +} + } Review comment: I spend more time to think about the cost framework, and I think we can't fully rely on it right now. For example, if there is a `rule1` doing something good, and a `rule2` doing something negative, the current cost framework in AQE simply discard the entire re-optimization and neither `rule1` or `rule2` takes effect. What we expect to see is to have `rule1` take effect while `rule2` does not. I think each rule should try its best to make sure itself is always positive. For this particular rule, we can run `EnsureRequirements` on the original plan and the updated plan, and only keep the updated plan if number of shuffles doesn't increase. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r332434571 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -74,7 +74,7 @@ case class AdaptiveSparkPlanExec( @transient private val optimizer = new RuleExecutor[LogicalPlan] { // TODO add more optimization rules override protected def batches: Seq[Batch] = Seq( - Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)) + // Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)) Review comment: why disable it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r332433471 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala ## @@ -190,7 +190,7 @@ case class CoalescedShuffleReaderExec( UnknownPartitioning(partitionStartIndices.length) } - private var cachedShuffleRDD: ShuffledRowRDD = null + private var cachedShuffleRDD: RDD[InternalRow] = null Review comment: why change this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r332433103 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + def canUseLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { +join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right) + } + + def canUseLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { +join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left) + } + + override def apply(plan: SparkPlan): SparkPlan = { +if (!conf.optimizedLocalShuffleReaderEnabled) { + return plan +} + +plan.transformDown{ + case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) => +val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) +join.copy(right = localReader) + case join: BroadcastHashJoinExec if canUseLocalShuffleReaderRight(join) => +val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec]) +join.copy(left = localReader) +} + } +} + +case class LocalShuffleReaderExec( +child: QueryStageExec) extends LeafExecNode { + + override def output: Seq[Attribute] = child.output + + override def doCanonicalize(): SparkPlan = child.canonicalized + + override def outputPartitioning: Partitioning = { +val numPartitions = child match { + case stage: ShuffleQueryStageExec => +stage.plan.shuffleDependency.rdd.partitions.length + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => +stage.plan.shuffleDependency.rdd.partitions.length +} +UnknownPartitioning(numPartitions) + } + + private var cachedShuffleRDD: RDD[InternalRow] = null + + override protected def doExecute(): RDD[InternalRow] = { +if (cachedShuffleRDD == null) { + cachedShuffleRDD = child match { +case stage: ShuffleQueryStageExec => + stage.plan.createLocalShuffleRDD() +case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => + stage.plan.createLocalShuffleRDD() + } +} +cachedShuffleRDD + } + + override def generateTreeString( +depth: Int, Review comment: nit: 4 space indentation for method parameters. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r332432970 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + def canUseLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { +join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right) + } + + def canUseLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { +join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left) + } + + override def apply(plan: SparkPlan): SparkPlan = { +if (!conf.optimizedLocalShuffleReaderEnabled) { + return plan +} + +plan.transformDown{ + case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) => +val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) +join.copy(right = localReader) + case join: BroadcastHashJoinExec if canUseLocalShuffleReaderRight(join) => +val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec]) +join.copy(left = localReader) +} + } +} + +case class LocalShuffleReaderExec( +child: QueryStageExec) extends LeafExecNode { + + override def output: Seq[Attribute] = child.output + + override def doCanonicalize(): SparkPlan = child.canonicalized + + override def outputPartitioning: Partitioning = { Review comment: For example, `LocalShuffleReaderExec(ShuffleQueryStage(scan))`, shouldn't we report `outputPartitioning` as `scan.outputPartitioning`? Each task of the `scan` operator writes data to one mapper, so the output partitioning of local shuffle reader should be the same as the operator before the shuffle. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r332404035 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + def canUseLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { +join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right) + } + + def canUseLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { +join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left) + } + + override def apply(plan: SparkPlan): SparkPlan = { +if (!conf.optimizedLocalShuffleReaderEnabled) { + return plan +} + +plan.transformDown{ + case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) => +val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) Review comment: We can't just call `.asInstanceOf[QueryStageExec]` here. `ShuffleQueryStageExec.isShuffleQueryStageExec` returns true for `ReusedQueryStageExec` as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r332404035 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + def canUseLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { +join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right) + } + + def canUseLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { +join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left) + } + + override def apply(plan: SparkPlan): SparkPlan = { +if (!conf.optimizedLocalShuffleReaderEnabled) { + return plan +} + +plan.transformDown{ + case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) => +val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) Review comment: We can't just call `.asInstanceOf[QueryStageExec]` here. `ShuffleQueryStageExec.isShuffleQueryStageExec` returns true for `ReusedQueryStageExec` as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r332402521 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + def canUseLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { +join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right) + } + + def canUseLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { +join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left) + } + + override def apply(plan: SparkPlan): SparkPlan = { +if (!conf.optimizedLocalShuffleReaderEnabled) { + return plan +} + +plan.transformDown{ + case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) => Review comment: shall we call it `canUseLocalShuffleReaderRight`? Here we wrap the right child with `LocalShuffleReaderExec`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r332401684 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark._ +import org.apache.spark.rdd.{RDD, ShuffledRDDPartition} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} + +/** + * This is a specialized version of [[org.apache.spark.sql.execution.ShuffledRowRDD]]. This is used + * in Spark SQL adaptive execution when a shuffle join is converted to broadcast join at runtime + * because the map output of one input table is small enough for broadcast. This RDD represents the + * data of another input table of the join that reads from shuffle. Each partition of the RDD reads + * the whole data from just one mapper output locally. So actually there is no data transferred + * from the network. + + * This RDD takes a [[ShuffleDependency]] (`dependency`). + * + * The `dependency` has the parent RDD of this RDD, which represents the dataset before shuffle + * (i.e. map output). Elements of this RDD are (partitionId, Row) pairs. + * Partition ids should be in the range [0, numPartitions - 1]. + * `dependency.partitioner.numPartitions` is the number of pre-shuffle partitions. (i.e. the number + * of partitions of the map output). The post-shuffle partition number is the same to the parent + * RDD's partition number. + */ +class LocalShuffledRowRDD( + var dependency: ShuffleDependency[Int, InternalRow, InternalRow], + metrics: Map[String, SQLMetric], + specifiedPartitionStartIndices: Option[Array[Int]] = None, + specifiedPartitionEndIndices: Option[Array[Int]] = None) + extends RDD[InternalRow](dependency.rdd.context, Nil) { + + private[this] val numPreShufflePartitions = dependency.partitioner.numPartitions + private[this] val numPostShufflePartitions = dependency.rdd.partitions.length Review comment: hmm, which "shuffle" we are talking about here? If number of mappers is the post shuffle partitions, then it means these mappers are also reducers and there is another shuffle upstream. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r326512378 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + private def setIsLocalToFalse(shuffleStage: QueryStageExec): QueryStageExec = { +shuffleStage match { + case stage: ShuffleQueryStageExec => +stage.isLocalShuffle = false + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => +stage.isLocalShuffle = false +} +shuffleStage + } + + private def revertLocalShuffleReader(newPlan: SparkPlan): SparkPlan = { +val revertPlan = newPlan.transformUp { + case localReader: LocalShuffleReaderExec +if (ShuffleQueryStageExec.isShuffleQueryStageExec(localReader.child)) => +setIsLocalToFalse(localReader.child) +} +revertPlan + } + + override def apply(plan: SparkPlan): SparkPlan = { +// Collect the `BroadcastHashJoinExec` nodes and if isEmpty directly return. +val bhjs = plan.collect { + case bhj: BroadcastHashJoinExec => bhj +} + +if (!conf.optimizedLocalShuffleReaderEnabled || bhjs.isEmpty) { + return plan +} + +// If the streamedPlan is `ShuffleQueryStageExec`, set the value of `isLocalShuffle` to true +bhjs.map { + case bhj: BroadcastHashJoinExec => +bhj.children map { + case stage: ShuffleQueryStageExec => stage.isLocalShuffle = true + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => +stage.isLocalShuffle = true + case plan: SparkPlan => plan +} +} + +// Add the new `LocalShuffleReaderExec` node if the value of `isLocalShuffle` is true +val newPlan = plan.transformUp { + case stage: ShuffleQueryStageExec if (stage.isLocalShuffle) => +LocalShuffleReaderExec(stage) + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) if (stage.isLocalShuffle) => +LocalShuffleReaderExec(stage) +} + +val afterEnsureRequirements = EnsureRequirements(conf).apply(newPlan) +val numExchanges = afterEnsureRequirements.collect { + case e: ShuffleExchangeExec => e +}.length +if (numExchanges > 0) { + logWarning("Local shuffle reader optimization is not applied due" + +" to additional shuffles will be introduced.") + revertLocalShuffleReader(newPlan) +} else { + newPlan +} + } +} + +case class LocalShuffleReaderExec( +child: QueryStageExec) extends UnaryExecNode { + + override def output: Seq[Attribute] = child.output + + override def doCanonicalize(): SparkPlan = child.canonicalized + + override def outputPartitioning: Partitioning = { Review comment: sorry, I mean `child.child.outputPartitioning` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r325545171 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark._ +import org.apache.spark.rdd.{RDD, ShuffledRDDPartition} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} + +/** + * This is a specialized version of [[org.apache.spark.sql.execution.ShuffledRowRDD]]. This is used + * in Spark SQL adaptive execution when a shuffle join is converted to broadcast join at runtime + * because the map output of one input table is small enough for broadcast. This RDD represents the + * data of another input table of the join that reads from shuffle. Each partition of the RDD reads + * the whole data from just one mapper output locally. So actually there is no data transferred + * from the network. + + * This RDD takes a [[ShuffleDependency]] (`dependency`). + * + * The `dependency` has the parent RDD of this RDD, which represents the dataset before shuffle + * (i.e. map output). Elements of this RDD are (partitionId, Row) pairs. + * Partition ids should be in the range [0, numPartitions - 1]. + * `dependency.partitioner.numPartitions` is the number of pre-shuffle partitions. (i.e. the number + * of partitions of the map output). The post-shuffle partition number is the same to the parent + * RDD's partition number. + */ +class LocalShuffledRowRDD( + var dependency: ShuffleDependency[Int, InternalRow, InternalRow], + metrics: Map[String, SQLMetric], + specifiedPartitionStartIndices: Option[Array[Int]] = None, + specifiedPartitionEndIndices: Option[Array[Int]] = None) + extends RDD[InternalRow](dependency.rdd.context, Nil) { + + private[this] val numPreShufflePartitions = dependency.partitioner.numPartitions + private[this] val numPostShufflePartitions = dependency.rdd.partitions.length + + private[this] val partitionStartIndices: Array[Int] = specifiedPartitionStartIndices match { +case Some(indices) => indices +case None => Array(0) + } + + private[this] val partitionEndIndices: Array[Int] = specifiedPartitionEndIndices match { +case Some(indices) => indices +case None if specifiedPartitionStartIndices.isEmpty => Array(numPreShufflePartitions) +case _ => specifiedPartitionStartIndices.get.drop(1) :+ numPreShufflePartitions + } + + override def getDependencies: Seq[Dependency[_]] = List(dependency) + + override def getPartitions: Array[Partition] = { +assert(partitionStartIndices.length == partitionEndIndices.length) +Array.tabulate[Partition](numPostShufflePartitions) { i => + new ShuffledRDDPartition(i) +} + } + + override def getPreferredLocations(partition: Partition): Seq[String] = { +val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] +val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]] +tracker.getMapLocation(dep, partition.index, partition.index + 1) + } + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +val shuffledRowPartition = split.asInstanceOf[ShuffledRDDPartition] +val mapId = shuffledRowPartition.index +val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() +// `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator, +// as well as the `tempMetrics` for basic shuffle metrics. +val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics) +// Connect the the InternalRows read by each ShuffleReader +new Iterator[InternalRow] { + val readers = partitionStartIndices.zip(partitionEndIndices).map { case (start, end) => Review comment: I get your point that some shuffle blocks are empty and we should skip them, but I think this
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r325542826 ## File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ## @@ -629,6 +645,35 @@ private[spark] class MapOutputTrackerMaster( None } + /** + * Return the locations where the Mapper(s) ran. The locations each includes both a host and an + * executor id on that host. + * + * @param dep shuffle dependency object + * @param startMapId the start map id + * @param endMapId the end map id + * @return a sequence of locations that each includes both a host and an executor id on that Review comment: `includes both a host and an executor id` is confusing. We can just say `task location strinng (please refer to TaskLocation)` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r325540713 ## File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ## @@ -629,6 +645,35 @@ private[spark] class MapOutputTrackerMaster( None } + /** + * Return the locations where the Mapper(s) ran. The locations each includes both a host and an + * executor id on that host. + * + * @param dep shuffle dependency object + * @param startMapId the start map id + * @param endMapId the end map id + * @return a sequence of locations that each includes both a host and an executor id on that Review comment: Why not return `ExecutorCacheTaskLocation`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r325540713 ## File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ## @@ -629,6 +645,35 @@ private[spark] class MapOutputTrackerMaster( None } + /** + * Return the locations where the Mapper(s) ran. The locations each includes both a host and an + * executor id on that host. + * + * @param dep shuffle dependency object + * @param startMapId the start map id + * @param endMapId the end map id + * @return a sequence of locations that each includes both a host and an executor id on that Review comment: Why not return `ExecutorCacheTaskLocation`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r325539965 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark._ +import org.apache.spark.rdd.{RDD, ShuffledRDDPartition} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} + +/** + * This is a specialized version of [[org.apache.spark.sql.execution.ShuffledRowRDD]]. This is used + * in Spark SQL adaptive execution when a shuffle join is converted to broadcast join at runtime + * because the map output of one input table is small enough for broadcast. This RDD represents the + * data of another input table of the join that reads from shuffle. Each partition of the RDD reads + * the whole data from just one mapper output locally. So actually there is no data transferred + * from the network. + + * This RDD takes a [[ShuffleDependency]] (`dependency`). + * + * The `dependency` has the parent RDD of this RDD, which represents the dataset before shuffle + * (i.e. map output). Elements of this RDD are (partitionId, Row) pairs. + * Partition ids should be in the range [0, numPartitions - 1]. + * `dependency.partitioner.numPartitions` is the number of pre-shuffle partitions. (i.e. the number + * of partitions of the map output). The post-shuffle partition number is the same to the parent + * RDD's partition number. + */ +class LocalShuffledRowRDD( + var dependency: ShuffleDependency[Int, InternalRow, InternalRow], + metrics: Map[String, SQLMetric], + specifiedPartitionStartIndices: Option[Array[Int]] = None, + specifiedPartitionEndIndices: Option[Array[Int]] = None) + extends RDD[InternalRow](dependency.rdd.context, Nil) { + + private[this] val numPreShufflePartitions = dependency.partitioner.numPartitions + private[this] val numPostShufflePartitions = dependency.rdd.partitions.length Review comment: The name is wrong. This is the # of mappers and thus should be called `numPreShufflePartitions` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r325538188 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark._ +import org.apache.spark.rdd.{RDD, ShuffledRDDPartition} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} + +/** + * This is a specialized version of [[org.apache.spark.sql.execution.ShuffledRowRDD]]. This is used + * in Spark SQL adaptive execution when a shuffle join is converted to broadcast join at runtime + * because the map output of one input table is small enough for broadcast. This RDD represents the + * data of another input table of the join that reads from shuffle. Each partition of the RDD reads + * the whole data from just one mapper output locally. So actually there is no data transferred + * from the network. + + * This RDD takes a [[ShuffleDependency]] (`dependency`). + * + * The `dependency` has the parent RDD of this RDD, which represents the dataset before shuffle + * (i.e. map output). Elements of this RDD are (partitionId, Row) pairs. + * Partition ids should be in the range [0, numPartitions - 1]. + * `dependency.partitioner.numPartitions` is the number of pre-shuffle partitions. (i.e. the number + * of partitions of the map output). The post-shuffle partition number is the same to the parent + * RDD's partition number. + */ +class LocalShuffledRowRDD( + var dependency: ShuffleDependency[Int, InternalRow, InternalRow], + metrics: Map[String, SQLMetric], + specifiedPartitionStartIndices: Option[Array[Int]] = None, + specifiedPartitionEndIndices: Option[Array[Int]] = None) Review comment: then let's add it when you propose this optimization. From my side, I think it may be beneficial to keep empty tasks, so that the local shuffle reader node can retain the output partitioning from the original plan and help us eliminate shuffles. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r325536699 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + private def setIsLocalToFalse(shuffleStage: QueryStageExec): QueryStageExec = { +shuffleStage match { + case stage: ShuffleQueryStageExec => +stage.isLocalShuffle = false + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => +stage.isLocalShuffle = false +} +shuffleStage + } + + private def revertLocalShuffleReader(newPlan: SparkPlan): SparkPlan = { +val revertPlan = newPlan.transformUp { + case localReader: LocalShuffleReaderExec +if (ShuffleQueryStageExec.isShuffleQueryStageExec(localReader.child)) => +setIsLocalToFalse(localReader.child) +} +revertPlan + } + + override def apply(plan: SparkPlan): SparkPlan = { +// Collect the `BroadcastHashJoinExec` nodes and if isEmpty directly return. +val bhjs = plan.collect { + case bhj: BroadcastHashJoinExec => bhj +} + +if (!conf.optimizedLocalShuffleReaderEnabled || bhjs.isEmpty) { + return plan +} + +// If the streamedPlan is `ShuffleQueryStageExec`, set the value of `isLocalShuffle` to true +bhjs.map { + case bhj: BroadcastHashJoinExec => +bhj.children map { + case stage: ShuffleQueryStageExec => stage.isLocalShuffle = true + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => +stage.isLocalShuffle = true + case plan: SparkPlan => plan +} +} + +// Add the new `LocalShuffleReaderExec` node if the value of `isLocalShuffle` is true +val newPlan = plan.transformUp { + case stage: ShuffleQueryStageExec if (stage.isLocalShuffle) => +LocalShuffleReaderExec(stage) + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) if (stage.isLocalShuffle) => +LocalShuffleReaderExec(stage) +} + +val afterEnsureRequirements = EnsureRequirements(conf).apply(newPlan) +val numExchanges = afterEnsureRequirements.collect { + case e: ShuffleExchangeExec => e +}.length +if (numExchanges > 0) { + logWarning("Local shuffle reader optimization is not applied due" + +" to additional shuffles will be introduced.") + revertLocalShuffleReader(newPlan) +} else { + newPlan +} + } +} + +case class LocalShuffleReaderExec( +child: QueryStageExec) extends UnaryExecNode { Review comment: We can make it a leaf node to hide its `QueryStageExec`. We don't expect any other rules to change the underlying shuffle stage. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r325536116 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + private def setIsLocalToFalse(shuffleStage: QueryStageExec): QueryStageExec = { +shuffleStage match { + case stage: ShuffleQueryStageExec => +stage.isLocalShuffle = false + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => +stage.isLocalShuffle = false +} +shuffleStage + } + + private def revertLocalShuffleReader(newPlan: SparkPlan): SparkPlan = { +val revertPlan = newPlan.transformUp { + case localReader: LocalShuffleReaderExec +if (ShuffleQueryStageExec.isShuffleQueryStageExec(localReader.child)) => +setIsLocalToFalse(localReader.child) +} +revertPlan + } + + override def apply(plan: SparkPlan): SparkPlan = { +// Collect the `BroadcastHashJoinExec` nodes and if isEmpty directly return. +val bhjs = plan.collect { + case bhj: BroadcastHashJoinExec => bhj +} + +if (!conf.optimizedLocalShuffleReaderEnabled || bhjs.isEmpty) { + return plan +} + +// If the streamedPlan is `ShuffleQueryStageExec`, set the value of `isLocalShuffle` to true +bhjs.map { + case bhj: BroadcastHashJoinExec => +bhj.children map { + case stage: ShuffleQueryStageExec => stage.isLocalShuffle = true + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => +stage.isLocalShuffle = true + case plan: SparkPlan => plan +} +} + +// Add the new `LocalShuffleReaderExec` node if the value of `isLocalShuffle` is true +val newPlan = plan.transformUp { + case stage: ShuffleQueryStageExec if (stage.isLocalShuffle) => +LocalShuffleReaderExec(stage) + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) if (stage.isLocalShuffle) => +LocalShuffleReaderExec(stage) +} + +val afterEnsureRequirements = EnsureRequirements(conf).apply(newPlan) +val numExchanges = afterEnsureRequirements.collect { + case e: ShuffleExchangeExec => e +}.length +if (numExchanges > 0) { + logWarning("Local shuffle reader optimization is not applied due" + +" to additional shuffles will be introduced.") + revertLocalShuffleReader(newPlan) +} else { + newPlan +} + } +} + +case class LocalShuffleReaderExec( +child: QueryStageExec) extends UnaryExecNode { + + override def output: Seq[Attribute] = child.output + + override def doCanonicalize(): SparkPlan = child.canonicalized + + override def outputPartitioning: Partitioning = { Review comment: shouldn't this be `child.outputPartitioning`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-m
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r325533208 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -91,6 +91,7 @@ case class AdaptiveSparkPlanExec( // optimizations should be stage-independent. @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( ReuseAdaptiveSubquery(conf, subqueryCache), +OptimizedLocalShuffleReader(conf), Review comment: since this may change the number of exchanges, we should put it in `queryStagePreparationRules` Then the AQE framework can check the cost and give up the optimization if extra changes are introduced. Note that, making it a physical rule and check number of exchanges is suboptimal. It's possible that the local shuffle reader can avoid exchanges downstream, which changes the stage boundaries. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r325533208 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -91,6 +91,7 @@ case class AdaptiveSparkPlanExec( // optimizations should be stage-independent. @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( ReuseAdaptiveSubquery(conf, subqueryCache), +OptimizedLocalShuffleReader(conf), Review comment: since this may change the number of exchanges, we should put it in `queryStagePreparationRules` Then the AQE framework can check the cost and give up the optimization if extra changes are introduced. Note that, the current approach (check number of exchanges at the end of rule) is suboptimal. It's possible that the local shuffle reader can avoid exchanges downstream, which changes the stage boundaries. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r325533208 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -91,6 +91,7 @@ case class AdaptiveSparkPlanExec( // optimizations should be stage-independent. @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( ReuseAdaptiveSubquery(conf, subqueryCache), +OptimizedLocalShuffleReader(conf), Review comment: since this may change the number of exchanges, we should put it in https://github.com/apache/spark/pull/25295/files#diff-6954dd8020a9ca298f1fb9602c0e831cR77 Then the AQE framework can check the cost and give up the optimization if extra changes are introduced. Note that, making it a physical rule and check number of exchanges is suboptimal. It's possible that the local shuffle reader can avoid exchanges downstream, which changes the stage boundaries. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r325530214 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + private def setIsLocalToFalse(shuffleStage: QueryStageExec): QueryStageExec = { +shuffleStage match { + case stage: ShuffleQueryStageExec => +stage.isLocalShuffle = false + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => +stage.isLocalShuffle = false +} +shuffleStage + } + + private def revertLocalShuffleReader(newPlan: SparkPlan): SparkPlan = { +val revertPlan = newPlan.transformUp { + case localReader: LocalShuffleReaderExec +if (ShuffleQueryStageExec.isShuffleQueryStageExec(localReader.child)) => +setIsLocalToFalse(localReader.child) +} +revertPlan + } + + override def apply(plan: SparkPlan): SparkPlan = { +// Collect the `BroadcastHashJoinExec` nodes and if isEmpty directly return. +val bhjs = plan.collect { + case bhj: BroadcastHashJoinExec => bhj +} + +if (!conf.optimizedLocalShuffleReaderEnabled || bhjs.isEmpty) { + return plan +} + +// If the streamedPlan is `ShuffleQueryStageExec`, set the value of `isLocalShuffle` to true +bhjs.map { + case bhj: BroadcastHashJoinExec => +bhj.children map { + case stage: ShuffleQueryStageExec => stage.isLocalShuffle = true + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => +stage.isLocalShuffle = true + case plan: SparkPlan => plan +} +} + +// Add the new `LocalShuffleReaderExec` node if the value of `isLocalShuffle` is true +val newPlan = plan.transformUp { + case stage: ShuffleQueryStageExec if (stage.isLocalShuffle) => +LocalShuffleReaderExec(stage) + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) if (stage.isLocalShuffle) => Review comment: let's not strip the `ReusedQueryStageExec` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r325529240 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + private def setIsLocalToFalse(shuffleStage: QueryStageExec): QueryStageExec = { +shuffleStage match { + case stage: ShuffleQueryStageExec => +stage.isLocalShuffle = false + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => +stage.isLocalShuffle = false +} +shuffleStage + } + + private def revertLocalShuffleReader(newPlan: SparkPlan): SparkPlan = { +val revertPlan = newPlan.transformUp { + case localReader: LocalShuffleReaderExec +if (ShuffleQueryStageExec.isShuffleQueryStageExec(localReader.child)) => +setIsLocalToFalse(localReader.child) +} +revertPlan + } + + override def apply(plan: SparkPlan): SparkPlan = { +// Collect the `BroadcastHashJoinExec` nodes and if isEmpty directly return. +val bhjs = plan.collect { + case bhj: BroadcastHashJoinExec => bhj +} + +if (!conf.optimizedLocalShuffleReaderEnabled || bhjs.isEmpty) { + return plan +} + +// If the streamedPlan is `ShuffleQueryStageExec`, set the value of `isLocalShuffle` to true +bhjs.map { + case bhj: BroadcastHashJoinExec => +bhj.children map { + case stage: ShuffleQueryStageExec => stage.isLocalShuffle = true + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => +stage.isLocalShuffle = true + case plan: SparkPlan => plan +} +} + +// Add the new `LocalShuffleReaderExec` node if the value of `isLocalShuffle` is true +val newPlan = plan.transformUp { Review comment: Why don't we traverse the tree once? ``` def isShuffleStage(plan: SparkPlan): Boolean = plan match { case _: ShuffleQueryStageExec => true case ReusedQueryStageExec(_: ShuffleQueryStageExec) => true case _ => false } def canUseLocalShuffleReaderLeft(j: BroadcastHashJoinExec): Boolean = { j.buildSide = BuildLeft && isShuffleStage(j.left) } def canUseLocalShuffleReaderRight ... ... plan transformDown { case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) => val localShuffleReader = ... join.copy(left = localShuffleReader) ... } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r325526025 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + private def setIsLocalToFalse(shuffleStage: QueryStageExec): QueryStageExec = { +shuffleStage match { + case stage: ShuffleQueryStageExec => +stage.isLocalShuffle = false Review comment: if possible let's not add mutable states to the plan This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r325525673 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { Review comment: nit: this should be a verb, `OptimizeLocalShuffleReader ` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r323609170 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala ## @@ -180,25 +180,45 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { case class CoalescedShuffleReaderExec( child: QueryStageExec, -partitionStartIndices: Array[Int]) extends UnaryExecNode { +partitionStartIndices: Array[Int], +var isLocal: Boolean = false) extends UnaryExecNode { Review comment: without local shuffle reader, a task of `ShuffledRDD` reads the shuffle blocks `map1-reduce1`, `map2-reduce1`, etc. With local shuffle reader, the task reads `map1-reduce1`, `map1-reduce2`, etc. The task output data size is different and we can't use the algorithm in `ReduceNumShufflePartitions` anymore. Furthermore, the RDD numPartitions also becomes different after switching to local shuffle reader, how can we apply the `ReduceNumShufflePartitions`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r323223769 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala ## @@ -180,25 +180,45 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { case class CoalescedShuffleReaderExec( child: QueryStageExec, -partitionStartIndices: Array[Int]) extends UnaryExecNode { +partitionStartIndices: Array[Int], +var isLocal: Boolean = false) extends UnaryExecNode { Review comment: `ReduceNumShufflePartitions` and local shuffle reader are two different optimizations, and they are conflicting: `ReduceNumShufflePartitions` adjusts the numPartitions by assuming the partitions are post-shuffle partitions. Their data size depends on the shuffle blocks they need to read. If we change the shuffle to local shuffle reader, then the partitions become pre-shuffle partitions, and their data size is different. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r323223769 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala ## @@ -180,25 +180,45 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { case class CoalescedShuffleReaderExec( child: QueryStageExec, -partitionStartIndices: Array[Int]) extends UnaryExecNode { +partitionStartIndices: Array[Int], +var isLocal: Boolean = false) extends UnaryExecNode { Review comment: `ReduceNumShufflePartitions` and local shuffle reader are two different optimizations, and they are conflicting: `ReduceNumShufflePartitions` adjusts the numPartitions by assuming the partitions are post-shuffle partitions. Their data size depends on the shuffle blocks they need to read. If we change the shuffle to local shuffle readers, then the partitions become pre-shuffle partitions, and their data size is different. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org