This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6b5a1f9df28 Revert "[SPARK-38354][SQL] Add hash probes metric for shuffled hash join" 6b5a1f9df28 is described below commit 6b5a1f9df28262fa90d28dc15af67e8a37a9efcf Author: Cheng Su <chen...@fb.com> AuthorDate: Tue Apr 26 16:12:35 2022 -0700 Revert "[SPARK-38354][SQL] Add hash probes metric for shuffled hash join" This reverts commit 158436655f30141bbd5afa8d95aec66282a5c4b4, as the original PR caused performance regression reported in https://github.com/apache/spark/pull/35686#issuecomment-1107807027 . Closes #36338 from c21/revert-metrics. Authored-by: Cheng Su <chen...@fb.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../apache/spark/unsafe/map/BytesToBytesMap.java | 2 +- .../execution/UnsafeFixedWidthAggregationMap.java | 6 ++-- .../execution/aggregate/HashAggregateExec.scala | 4 +-- .../aggregate/TungstenAggregationIterator.scala | 2 +- .../spark/sql/execution/joins/HashedRelation.scala | 35 ---------------------- .../sql/execution/joins/ShuffledHashJoinExec.scala | 10 ++----- .../sql/execution/metric/SQLMetricsSuite.scala | 16 +++++----- 7 files changed, 16 insertions(+), 59 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index f4f4052b4fa..f474c30b8b3 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -941,7 +941,7 @@ public final class BytesToBytesMap extends MemoryConsumer { /** * Returns the average number of probes per key lookup. */ - public double getAvgHashProbesPerKey() { + public double getAvgHashProbeBucketListIterations() { return (1.0 * numProbes) / numKeyLookups; } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java index 8587d929007..43e174d7273 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java @@ -221,10 +221,10 @@ public final class UnsafeFixedWidthAggregationMap { } /** - * Gets the average number of hash probes per key lookup in the underlying `BytesToBytesMap`. + * Gets the average bucket list iterations per lookup in the underlying `BytesToBytesMap`. */ - public double getAvgHashProbesPerKey() { - return map.getAvgHashProbesPerKey(); + public double getAvgHashProbeBucketListIterations() { + return map.getAvgHashProbeBucketListIterations(); } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 6c83ba5546d..935844d96d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -68,7 +68,7 @@ case class HashAggregateExec( "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in aggregation build"), "avgHashProbe" -> - SQLMetrics.createAverageMetric(sparkContext, "avg hash probes per key"), + SQLMetrics.createAverageMetric(sparkContext, "avg hash probe bucket list iters"), "numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext, "number of sort fallback tasks")) // This is for testing. We force TungstenAggregationIterator to fall back to the unsafe row hash @@ -207,7 +207,7 @@ case class HashAggregateExec( metrics.incPeakExecutionMemory(maxMemory) // Update average hashmap probe - avgHashProbe.set(hashMap.getAvgHashProbesPerKey) + avgHashProbe.set(hashMap.getAvgHashProbeBucketListIterations) if (sorter == null) { // not spilled diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index 36405fe9272..0a5e8838e15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -389,7 +389,7 @@ class TungstenAggregationIterator( metrics.incPeakExecutionMemory(maxMemory) // Updating average hashmap probe - avgHashProbe.set(hashMap.getAvgHashProbesPerKey) + avgHashProbe.set(hashMap.getAvgHashProbeBucketListIterations) }) /////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 253f16e39d3..11d3af4e546 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -110,11 +110,6 @@ private[execution] sealed trait HashedRelation extends KnownSizeEstimation { */ def keys(): Iterator[InternalRow] - /** - * Returns the average number of hash probes per key lookup. - */ - def getAvgHashProbesPerKey(): Double - /** * Returns a read-only copy of this, to be safely used in current thread. */ @@ -226,8 +221,6 @@ private[joins] class UnsafeHashedRelation( override def estimatedSize: Long = binaryMap.getTotalMemoryConsumption - override def getAvgHashProbesPerKey(): Double = binaryMap.getAvgHashProbesPerKey - // re-used in get()/getValue()/getWithKeyIndex()/getValueWithKeyIndex()/valuesWithKeyIndex() var resultRow = new UnsafeRow(numFields) @@ -575,12 +568,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap // The number of unique keys. private var numKeys = 0L - // The number of hash probes for keys. - private var numProbes = 0L - - // The number of keys lookups. - private var numKeyLookups = 0L - // needed by serializer def this() = { this( @@ -629,11 +616,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap */ def getTotalMemoryConsumption: Long = array.length * 8L + page.length * 8L - /** - * Returns the average number of hash probes per key lookup. - */ - def getAvgHashProbesPerKey: Double = (1.0 * numProbes) / numKeyLookups - /** * Returns the first slot of array that store the keys (sparse mode). */ @@ -668,9 +650,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap * Returns the single UnsafeRow for given key, or null if not found. */ def getValue(key: Long, resultRow: UnsafeRow): UnsafeRow = { - numKeyLookups += 1 if (isDense) { - numProbes += 1 if (key >= minKey && key <= maxKey) { val value = array((key - minKey).toInt) if (value > 0) { @@ -678,14 +658,12 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap } } } else { - numProbes += 1 var pos = firstSlot(key) while (array(pos + 1) != 0) { if (array(pos) == key) { return getRow(array(pos + 1), resultRow) } pos = nextSlot(pos) - numProbes += 1 } } null @@ -712,9 +690,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap * Returns an iterator for all the values for the given key, or null if no value found. */ def get(key: Long, resultRow: UnsafeRow): Iterator[UnsafeRow] = { - numKeyLookups += 1 if (isDense) { - numProbes += 1 if (key >= minKey && key <= maxKey) { val value = array((key - minKey).toInt) if (value > 0) { @@ -722,14 +698,12 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap } } } else { - numProbes += 1 var pos = firstSlot(key) while (array(pos + 1) != 0) { if (array(pos) == key) { return valueIter(array(pos + 1), resultRow) } pos = nextSlot(pos) - numProbes += 1 } } null @@ -808,13 +782,10 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap * Update the address in array for given key. */ private def updateIndex(key: Long, address: Long): Unit = { - numKeyLookups += 1 - numProbes += 1 var pos = firstSlot(key) assert(numKeys < array.length / 2) while (array(pos) != key && array(pos + 1) != 0) { pos = nextSlot(pos) - numProbes += 1 } if (array(pos + 1) == 0) { // this is the first value for this key, put the address in array. @@ -1017,8 +988,6 @@ class LongHashedRelation( override def estimatedSize: Long = map.getTotalMemoryConsumption - override def getAvgHashProbesPerKey(): Double = map.getAvgHashProbesPerKey - override def get(key: InternalRow): Iterator[InternalRow] = { if (key.isNullAt(0)) { null @@ -1136,8 +1105,6 @@ case object EmptyHashedRelation extends HashedRelation { override def close(): Unit = {} override def estimatedSize: Long = 0 - - override def getAvgHashProbesPerKey(): Double = 0 } /** @@ -1164,8 +1131,6 @@ case object HashedRelationWithAllNullKeys extends HashedRelation { override def close(): Unit = {} override def estimatedSize: Long = 0 - - override def getAvgHashProbesPerKey(): Double = 0 } /** The HashedRelationBroadcastMode requires that rows are broadcasted as a HashedRelation. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index 38c9c82f77e..cfe35d04778 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -49,8 +49,7 @@ case class ShuffledHashJoinExec( override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"), - "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"), - "avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash probes per key")) + "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map")) override def output: Seq[Attribute] = super[ShuffledJoin].output @@ -78,7 +77,6 @@ case class ShuffledHashJoinExec( def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = { val buildDataSize = longMetric("buildDataSize") val buildTime = longMetric("buildTime") - val avgHashProbe = longMetric("avgHashProbe") val start = System.nanoTime() val context = TaskContext.get() val relation = HashedRelation( @@ -91,11 +89,7 @@ case class ShuffledHashJoinExec( buildTime += NANOSECONDS.toMillis(System.nanoTime() - start) buildDataSize += relation.estimatedSize // This relation is usually used until the end of task. - context.addTaskCompletionListener[Unit](_ => { - // Update average hashmap probe - avgHashProbe.set(relation.getAvgHashProbesPerKey()) - relation.close() - }) + context.addTaskCompletionListener[Unit](_ => relation.close()) relation } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 37a6d18e516..ee10ae2e334 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -109,11 +109,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils val df = testData2.groupBy().count() // 2 partitions val expected1 = Seq( Map("number of output rows" -> 2L, - "avg hash probes per key" -> + "avg hash probe bucket list iters" -> aggregateMetricsPattern, "number of sort fallback tasks" -> 0L), Map("number of output rows" -> 1L, - "avg hash probes per key" -> + "avg hash probe bucket list iters" -> aggregateMetricsPattern, "number of sort fallback tasks" -> 0L)) val shuffleExpected1 = Map( @@ -131,11 +131,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils val df2 = testData2.groupBy($"a").count() val expected2 = Seq( Map("number of output rows" -> 4L, - "avg hash probes per key" -> + "avg hash probe bucket list iters" -> aggregateMetricsPattern, "number of sort fallback tasks" -> 0L), Map("number of output rows" -> 3L, - "avg hash probes per key" -> + "avg hash probe bucket list iters" -> aggregateMetricsPattern, "number of sort fallback tasks" -> 0L)) @@ -184,7 +184,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get nodeIds.foreach { nodeId => - val probes = metrics(nodeId)._2("avg hash probes per key").toString + val probes = metrics(nodeId)._2("avg hash probe bucket list iters").toString if (!probes.contains("\n")) { // It's a single metrics value assert(probes.toDouble > 1.0) @@ -372,8 +372,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils val df = df1.join(df2, "key") testSparkPlanMetrics(df, 1, Map( nodeId1 -> (("ShuffledHashJoin", Map( - "number of output rows" -> 2L, - "avg hash probes per key" -> aggregateMetricsPattern))), + "number of output rows" -> 2L))), nodeId2 -> (("Exchange", Map( "shuffle records written" -> 2L, "records read" -> 2L))), @@ -402,8 +401,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils rightDf.hint("shuffle_hash"), $"key" === $"key2", joinType) testSparkPlanMetrics(df, 1, Map( nodeId -> (("ShuffledHashJoin", Map( - "number of output rows" -> rows, - "avg hash probes per key" -> aggregateMetricsPattern)))), + "number of output rows" -> rows)))), enableWholeStage ) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org