This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 89a988c [SPARK-38045][SS][TEST][3.2] More strict validation on plan check for stream-stream join unit test 89a988c is described below commit 89a988cbbbc02acdb7614e7da6d87d19f2c938ea Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Fri Jan 28 16:09:04 2022 +0900 [SPARK-38045][SS][TEST][3.2] More strict validation on plan check for stream-stream join unit test ### What changes were proposed in this pull request? This PR is a follow-up of SPARK-35693 to enhance the unit test on stream-stream join to be more strict on plan check. ### Why are the changes needed? We would like to be more strict on plan check so that requirement of distribution against stream-stream join is fulfilled. ### Does this PR introduce _any_ user-facing change? No, test only. ### How was this patch tested? Modified test passed. Closes #35347 from HeartSaVioR/SPARK-38045-branch-3.2. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../apache/spark/sql/streaming/StreamingJoinSuite.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala index 8c830d3..5ec47bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala @@ -28,6 +28,8 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.streaming.{MemoryStream, StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec, StreamingSymmetricHashJoinHelper} import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreProviderId} @@ -583,9 +585,21 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite { CheckAnswer(1.to(1000): _*), Execute { query => // Verify the query plan + def partitionExpressionsColumns(expressions: Seq[Expression]): Seq[String] = { + expressions.flatMap { + case ref: AttributeReference => Some(ref.name) + } + } + + val numPartitions = spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) + assert(query.lastExecution.executedPlan.collect { case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _, - _: ShuffleExchangeExec, ShuffleExchangeExec(_, _: ShuffleExchangeExec, _)) => j + ShuffleExchangeExec(opA: HashPartitioning, _, _), + ShuffleExchangeExec(opB: HashPartitioning, _, _)) + if partitionExpressionsColumns(opA.expressions) === Seq("a", "b") + && partitionExpressionsColumns(opB.expressions) === Seq("a", "b") + && opA.numPartitions == numPartitions && opB.numPartitions == numPartitions => j }.size == 1) }) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org