This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 41d44d162c0 Revert "[SPARK-38354][SQL] Add hash probes metric for 
shuffled hash join"
41d44d162c0 is described below

commit 41d44d162c0b5057fd737bccf450010f6dcd4684
Author: Cheng Su <chen...@fb.com>
AuthorDate: Tue Apr 26 16:12:35 2022 -0700

    Revert "[SPARK-38354][SQL] Add hash probes metric for shuffled hash join"
    
    This reverts commit 158436655f30141bbd5afa8d95aec66282a5c4b4, as the 
original PR caused performance regression reported in 
https://github.com/apache/spark/pull/35686#issuecomment-1107807027 .
    
    Closes #36338 from c21/revert-metrics.
    
    Authored-by: Cheng Su <chen...@fb.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit 6b5a1f9df28262fa90d28dc15af67e8a37a9efcf)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java   |  2 +-
 .../execution/UnsafeFixedWidthAggregationMap.java  |  6 ++--
 .../execution/aggregate/HashAggregateExec.scala    |  4 +--
 .../aggregate/TungstenAggregationIterator.scala    |  2 +-
 .../spark/sql/execution/joins/HashedRelation.scala | 35 ----------------------
 .../sql/execution/joins/ShuffledHashJoinExec.scala | 10 ++-----
 .../sql/execution/metric/SQLMetricsSuite.scala     | 16 +++++-----
 7 files changed, 16 insertions(+), 59 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index f4f4052b4fa..f474c30b8b3 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -941,7 +941,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
   /**
    * Returns the average number of probes per key lookup.
    */
-  public double getAvgHashProbesPerKey() {
+  public double getAvgHashProbeBucketListIterations() {
     return (1.0 * numProbes) / numKeyLookups;
   }
 
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index 8587d929007..43e174d7273 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -221,10 +221,10 @@ public final class UnsafeFixedWidthAggregationMap {
   }
 
   /**
-   * Gets the average number of hash probes per key lookup in the underlying 
`BytesToBytesMap`.
+   * Gets the average bucket list iterations per lookup in the underlying 
`BytesToBytesMap`.
    */
-  public double getAvgHashProbesPerKey() {
-    return map.getAvgHashProbesPerKey();
+  public double getAvgHashProbeBucketListIterations() {
+    return map.getAvgHashProbeBucketListIterations();
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 6c83ba5546d..935844d96d9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -68,7 +68,7 @@ case class HashAggregateExec(
     "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
     "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in 
aggregation build"),
     "avgHashProbe" ->
-      SQLMetrics.createAverageMetric(sparkContext, "avg hash probes per key"),
+      SQLMetrics.createAverageMetric(sparkContext, "avg hash probe bucket list 
iters"),
     "numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext, "number of 
sort fallback tasks"))
 
   // This is for testing. We force TungstenAggregationIterator to fall back to 
the unsafe row hash
@@ -207,7 +207,7 @@ case class HashAggregateExec(
     metrics.incPeakExecutionMemory(maxMemory)
 
     // Update average hashmap probe
-    avgHashProbe.set(hashMap.getAvgHashProbesPerKey)
+    avgHashProbe.set(hashMap.getAvgHashProbeBucketListIterations)
 
     if (sorter == null) {
       // not spilled
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 36405fe9272..0a5e8838e15 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -389,7 +389,7 @@ class TungstenAggregationIterator(
     metrics.incPeakExecutionMemory(maxMemory)
 
     // Updating average hashmap probe
-    avgHashProbe.set(hashMap.getAvgHashProbesPerKey)
+    avgHashProbe.set(hashMap.getAvgHashProbeBucketListIterations)
   })
 
   ///////////////////////////////////////////////////////////////////////////
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 253f16e39d3..11d3af4e546 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -110,11 +110,6 @@ private[execution] sealed trait HashedRelation extends 
KnownSizeEstimation {
    */
   def keys(): Iterator[InternalRow]
 
-  /**
-   * Returns the average number of hash probes per key lookup.
-   */
-  def getAvgHashProbesPerKey(): Double
-
   /**
    * Returns a read-only copy of this, to be safely used in current thread.
    */
@@ -226,8 +221,6 @@ private[joins] class UnsafeHashedRelation(
 
   override def estimatedSize: Long = binaryMap.getTotalMemoryConsumption
 
-  override def getAvgHashProbesPerKey(): Double = 
binaryMap.getAvgHashProbesPerKey
-
   // re-used in 
get()/getValue()/getWithKeyIndex()/getValueWithKeyIndex()/valuesWithKeyIndex()
   var resultRow = new UnsafeRow(numFields)
 
@@ -575,12 +568,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
   // The number of unique keys.
   private var numKeys = 0L
 
-  // The number of hash probes for keys.
-  private var numProbes = 0L
-
-  // The number of keys lookups.
-  private var numKeyLookups = 0L
-
   // needed by serializer
   def this() = {
     this(
@@ -629,11 +616,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
    */
   def getTotalMemoryConsumption: Long = array.length * 8L + page.length * 8L
 
-  /**
-   * Returns the average number of hash probes per key lookup.
-   */
-  def getAvgHashProbesPerKey: Double = (1.0 * numProbes) / numKeyLookups
-
   /**
    * Returns the first slot of array that store the keys (sparse mode).
    */
@@ -668,9 +650,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
    * Returns the single UnsafeRow for given key, or null if not found.
    */
   def getValue(key: Long, resultRow: UnsafeRow): UnsafeRow = {
-    numKeyLookups += 1
     if (isDense) {
-      numProbes += 1
       if (key >= minKey && key <= maxKey) {
         val value = array((key - minKey).toInt)
         if (value > 0) {
@@ -678,14 +658,12 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
         }
       }
     } else {
-      numProbes += 1
       var pos = firstSlot(key)
       while (array(pos + 1) != 0) {
         if (array(pos) == key) {
           return getRow(array(pos + 1), resultRow)
         }
         pos = nextSlot(pos)
-        numProbes += 1
       }
     }
     null
@@ -712,9 +690,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
    * Returns an iterator for all the values for the given key, or null if no 
value found.
    */
   def get(key: Long, resultRow: UnsafeRow): Iterator[UnsafeRow] = {
-    numKeyLookups += 1
     if (isDense) {
-      numProbes += 1
       if (key >= minKey && key <= maxKey) {
         val value = array((key - minKey).toInt)
         if (value > 0) {
@@ -722,14 +698,12 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
         }
       }
     } else {
-      numProbes += 1
       var pos = firstSlot(key)
       while (array(pos + 1) != 0) {
         if (array(pos) == key) {
           return valueIter(array(pos + 1), resultRow)
         }
         pos = nextSlot(pos)
-        numProbes += 1
       }
     }
     null
@@ -808,13 +782,10 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
    * Update the address in array for given key.
    */
   private def updateIndex(key: Long, address: Long): Unit = {
-    numKeyLookups += 1
-    numProbes += 1
     var pos = firstSlot(key)
     assert(numKeys < array.length / 2)
     while (array(pos) != key && array(pos + 1) != 0) {
       pos = nextSlot(pos)
-      numProbes += 1
     }
     if (array(pos + 1) == 0) {
       // this is the first value for this key, put the address in array.
@@ -1017,8 +988,6 @@ class LongHashedRelation(
 
   override def estimatedSize: Long = map.getTotalMemoryConsumption
 
-  override def getAvgHashProbesPerKey(): Double = map.getAvgHashProbesPerKey
-
   override def get(key: InternalRow): Iterator[InternalRow] = {
     if (key.isNullAt(0)) {
       null
@@ -1136,8 +1105,6 @@ case object EmptyHashedRelation extends HashedRelation {
   override def close(): Unit = {}
 
   override def estimatedSize: Long = 0
-
-  override def getAvgHashProbesPerKey(): Double = 0
 }
 
 /**
@@ -1164,8 +1131,6 @@ case object HashedRelationWithAllNullKeys extends 
HashedRelation {
   override def close(): Unit = {}
 
   override def estimatedSize: Long = 0
-
-  override def getAvgHashProbesPerKey(): Double = 0
 }
 
 /** The HashedRelationBroadcastMode requires that rows are broadcasted as a 
HashedRelation. */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
index 38c9c82f77e..cfe35d04778 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
@@ -49,8 +49,7 @@ case class ShuffledHashJoinExec(
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
     "buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of 
build side"),
-    "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build 
hash map"),
-    "avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash 
probes per key"))
+    "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build 
hash map"))
 
   override def output: Seq[Attribute] = super[ShuffledJoin].output
 
@@ -78,7 +77,6 @@ case class ShuffledHashJoinExec(
   def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = {
     val buildDataSize = longMetric("buildDataSize")
     val buildTime = longMetric("buildTime")
-    val avgHashProbe = longMetric("avgHashProbe")
     val start = System.nanoTime()
     val context = TaskContext.get()
     val relation = HashedRelation(
@@ -91,11 +89,7 @@ case class ShuffledHashJoinExec(
     buildTime += NANOSECONDS.toMillis(System.nanoTime() - start)
     buildDataSize += relation.estimatedSize
     // This relation is usually used until the end of task.
-    context.addTaskCompletionListener[Unit](_ => {
-      // Update average hashmap probe
-      avgHashProbe.set(relation.getAvgHashProbesPerKey())
-      relation.close()
-    })
+    context.addTaskCompletionListener[Unit](_ => relation.close())
     relation
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 063f1862264..aa746370b8f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -109,11 +109,11 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
     val df = testData2.groupBy().count() // 2 partitions
     val expected1 = Seq(
       Map("number of output rows" -> 2L,
-        "avg hash probes per key" ->
+        "avg hash probe bucket list iters" ->
           aggregateMetricsPattern,
         "number of sort fallback tasks" -> 0L),
       Map("number of output rows" -> 1L,
-        "avg hash probes per key" ->
+        "avg hash probe bucket list iters" ->
           aggregateMetricsPattern,
         "number of sort fallback tasks" -> 0L))
     val shuffleExpected1 = Map(
@@ -131,11 +131,11 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
     val df2 = testData2.groupBy(Symbol("a")).count()
     val expected2 = Seq(
       Map("number of output rows" -> 4L,
-        "avg hash probes per key" ->
+        "avg hash probe bucket list iters" ->
           aggregateMetricsPattern,
         "number of sort fallback tasks" -> 0L),
       Map("number of output rows" -> 3L,
-        "avg hash probes per key" ->
+        "avg hash probe bucket list iters" ->
           aggregateMetricsPattern,
         "number of sort fallback tasks" -> 0L))
 
@@ -184,7 +184,7 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
       }
       val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
       nodeIds.foreach { nodeId =>
-        val probes = metrics(nodeId)._2("avg hash probes per key").toString
+        val probes = metrics(nodeId)._2("avg hash probe bucket list 
iters").toString
         if (!probes.contains("\n")) {
           // It's a single metrics value
           assert(probes.toDouble > 1.0)
@@ -372,8 +372,7 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
           val df = df1.join(df2, "key")
           testSparkPlanMetrics(df, 1, Map(
             nodeId1 -> (("ShuffledHashJoin", Map(
-              "number of output rows" -> 2L,
-              "avg hash probes per key" -> aggregateMetricsPattern))),
+              "number of output rows" -> 2L))),
             nodeId2 -> (("Exchange", Map(
               "shuffle records written" -> 2L,
               "records read" -> 2L))),
@@ -402,8 +401,7 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
           rightDf.hint("shuffle_hash"), $"key" === $"key2", joinType)
         testSparkPlanMetrics(df, 1, Map(
           nodeId -> (("ShuffledHashJoin", Map(
-            "number of output rows" -> rows,
-            "avg hash probes per key" -> aggregateMetricsPattern)))),
+            "number of output rows" -> rows)))),
           enableWholeStage
         )
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to