viirya commented on code in PR #53358:
URL: https://github.com/apache/spark/pull/53358#discussion_r2594434400


##########
sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala:
##########
@@ -2878,6 +2878,24 @@ class DatasetSuite extends QueryTest
     checkDataset(Seq(seqMutableSet).toDS(), seqMutableSet)
     checkDataset(Seq(mapMutableSet).toDS(), mapMutableSet)
   }
+
+  test("SPARK-54620: Observation should not blocking forever") {
+    val observation = Observation("row_count")
+
+    var df = Seq.empty[(Int, Int)].toDF("v1", "v2")
+    df = df.observe(observation,
+      functions.count(functions.lit(1)).alias("record_cnt"))
+    df = df.repartition($"v1")
+      .select($"v1" + 1 as "v1", $"v2" + 1 as "v2")
+      .join(
+        Seq((1, 2), (3, 4)).toDF("v1", "v2").repartition($"v2"),
+        Seq("v1"),
+        "inner")
+    df.collect()
+
+    val metrics = observation.get

Review Comment:
   For example, this is the comparison between two query plans:
   
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      ResultQueryStage 2
      +- EmptyRelation [plan_id=10867]
         +- Project [v1#12199, v2#12200, v2#12211]
            +- EmptyRelation Join Inner, (v1#12199 = v1#12210)
   ```
   
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      ResultQueryStage 3
      +- *(3) Project [v1#12199, v2#12200, v2#12211]
         +- *(3) BroadcastHashJoin [v1#12199], [v1#12210], Inner, BuildLeft, 
false
            :- BroadcastQueryStage 2
            :  +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), 
[plan_id=10929]
            :     +- *(2) Project [(v1#12195 + 1) AS v1#12199, (v2#12196 + 1) 
AS v2#12200]
            :        +- AQEShuffleRead coalesced
            :           +- ShuffleQueryStage 0
            :              +- Exchange hashpartitioning(v1#12195, 5), 
REPARTITION_BY_COL, [plan_id=10868]
            :                 +- CollectMetrics row_count, [count(1) AS 
record_cnt#12197L]
            :                    +- LocalTableScan <empty>, [v1#12195, v2#12196]
            +- AQEShuffleRead coalesced
               +- ShuffleQueryStage 1
                  +- Exchange hashpartitioning(v2#12211, 5), 
REPARTITION_BY_COL, [plan_id=10898]
                     +- *(1) Project [_1#12203 AS v1#12210, _2#12204 AS 
v2#12211]
                        +- *(1) LocalTableScan [_1#12203, _2#12204]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to