HeartSaVioR commented on pull request #35341: URL: https://github.com/apache/spark/pull/35341#issuecomment-1023250776
I tried with managed table with file source which supports bucketing, but looks like file stream source does not pick up the bucket info even it goes through the managed table, hence the output partitioning of source is unknownpartitioning. ``` test("aaa - streaming join should require HashClusteredDistribution from children") { withTempDir { dir => val randPostfix = Random.nextInt(100000) val tbl1Name = s"tbl1a$randPostfix" val tbl2Name = s"tbl2a$randPostfix" val batchDf = 1.to(1000).toDF("value") .select('value as 'a, 'value * 2 as 'b, 'value * 3 as 'c) batchDf.write.bucketBy(7, "a", "b").mode(SaveMode.Overwrite).saveAsTable(tbl1Name) val batchDf2 = 1.to(1000).toDF("value") .select('value as 'a, 'value * 3 as 'b, 'value * 4 as 'c) batchDf2.write.bucketBy(7, "a", "b").mode(SaveMode.Overwrite).saveAsTable(tbl2Name) sql(s"DESCRIBE EXTENDED $tbl1Name").show(numRows = 21, truncate = false) sql(s"DESCRIBE EXTENDED $tbl2Name").show(numRows = 21, truncate = false) val input1 = spark.readStream.table(tbl1Name) val input2 = spark.readStream.table(tbl2Name) val df1 = input1.toDF val df2 = input2.toDF // .repartition('b) val joined = df1.join(df2, Seq("a", "b")).select('a) testStream(joined)( // CheckAnswer(1.to(1000): _*), ProcessAllAvailable(), Execute { query => // Verify the query plan def partitionExpressionsColumns(expressions: Seq[Expression]): Seq[String] = { expressions.flatMap { case ref: AttributeReference => Some(ref.name) } } val numPartitions = spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) assert(query.lastExecution.executedPlan.collect { case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _, ShuffleExchangeExec(opA: HashPartitioning, _, _), ShuffleExchangeExec(opB: HashPartitioning, _, _)) if partitionExpressionsColumns(opA.expressions) === Seq("a", "b") && partitionExpressionsColumns(opB.expressions) === Seq("a", "b") && opA.numPartitions == numPartitions && opB.numPartitions == numPartitions => j }.size == 1) }) } } ``` If we want to test with SPARK-35703, we may need to have a source supporting bucket scan on streaming. I'm not sure we have it in built-in source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org