bozhang2820 commented on code in PR #45930:
URL: https://github.com/apache/spark/pull/45930#discussion_r1575960906
##########
core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala:
##########
@@ -187,6 +187,7 @@ private[spark] class SortShuffleManager(conf: SparkConf)
extends ShuffleManager
shuffleBlockResolver.removeDataByMap(shuffleId, mapTaskId)
}
}
+ shuffleBlockResolver.removeShuffleToSkip(shuffleId)
Review Comment:
Yeah this is a bit weird... Changed to use a Guava cache with a fixed
maximum size (1000).
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -869,6 +874,8 @@ case class AdaptiveExecutionContext(session: SparkSession,
qe: QueryExecution) {
*/
val stageCache: TrieMap[SparkPlan, ExchangeQueryStageExec] =
new TrieMap[SparkPlan, ExchangeQueryStageExec]()
+
+ val shuffleIds: TrieMap[Int, Boolean] = new TrieMap[Int, Boolean]()
Review Comment:
I think a concurrent hash map is still required since the context are shared
between the main query and all sub queries?
##########
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala:
##########
@@ -76,13 +78,21 @@ private[spark] class IndexShuffleBlockResolver(
override def getStoredShuffles(): Seq[ShuffleBlockInfo] = {
val allBlocks = blockManager.diskBlockManager.getAllBlocks()
allBlocks.flatMap {
- case ShuffleIndexBlockId(shuffleId, mapId, _) =>
+ case ShuffleIndexBlockId(shuffleId, mapId, _)
+ if Option(shuffleIdsToSkip.getIfPresent(shuffleId)).isEmpty =>
Some(ShuffleBlockInfo(shuffleId, mapId))
case _ =>
None
}
}
+ private val shuffleIdsToSkip =
+ CacheBuilder.newBuilder().maximumSize(1000).build[java.lang.Integer,
java.lang.Boolean]()
Review Comment:
Unfortunately Guava cache won't accept null values...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]