This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 89a988c  [SPARK-38045][SS][TEST][3.2] More strict validation on plan 
check for stream-stream join unit test
89a988c is described below

commit 89a988cbbbc02acdb7614e7da6d87d19f2c938ea
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Fri Jan 28 16:09:04 2022 +0900

    [SPARK-38045][SS][TEST][3.2] More strict validation on plan check for 
stream-stream join unit test
    
    ### What changes were proposed in this pull request?
    
    This PR is a follow-up of SPARK-35693 to enhance the unit test on 
stream-stream join to be more strict on plan check.
    
    ### Why are the changes needed?
    
    We would like to be more strict on plan check so that requirement of 
distribution against stream-stream join is fulfilled.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, test only.
    
    ### How was this patch tested?
    
    Modified test passed.
    
    Closes #35347 from HeartSaVioR/SPARK-38045-branch-3.2.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../apache/spark/sql/streaming/StreamingJoinSuite.scala  | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index 8c830d3..5ec47bb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -28,6 +28,8 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec, 
StreamingSymmetricHashJoinHelper}
 import org.apache.spark.sql.execution.streaming.state.{StateStore, 
StateStoreProviderId}
@@ -583,9 +585,21 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite {
       CheckAnswer(1.to(1000): _*),
       Execute { query =>
         // Verify the query plan
+        def partitionExpressionsColumns(expressions: Seq[Expression]): 
Seq[String] = {
+          expressions.flatMap {
+            case ref: AttributeReference => Some(ref.name)
+          }
+        }
+
+        val numPartitions = 
spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS)
+
         assert(query.lastExecution.executedPlan.collect {
           case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _,
-            _: ShuffleExchangeExec, ShuffleExchangeExec(_, _: 
ShuffleExchangeExec, _)) => j
+            ShuffleExchangeExec(opA: HashPartitioning, _, _),
+            ShuffleExchangeExec(opB: HashPartitioning, _, _))
+              if partitionExpressionsColumns(opA.expressions) === Seq("a", "b")
+                && partitionExpressionsColumns(opB.expressions) === Seq("a", 
"b")
+                && opA.numPartitions == numPartitions && opB.numPartitions == 
numPartitions => j
         }.size == 1)
       })
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to