HeartSaVioR commented on pull request #35341:
URL: https://github.com/apache/spark/pull/35341#issuecomment-1023250776


   I tried with managed table with file source which supports bucketing, but 
looks like file stream source does not pick up the bucket info even it goes 
through the managed table, hence the output partitioning of source is 
unknownpartitioning.
   
   ```
     test("aaa - streaming join should require HashClusteredDistribution from 
children") {
       withTempDir { dir =>
         val randPostfix = Random.nextInt(100000)
         val tbl1Name = s"tbl1a$randPostfix"
         val tbl2Name = s"tbl2a$randPostfix"
   
         val batchDf = 1.to(1000).toDF("value")
           .select('value as 'a, 'value * 2 as 'b, 'value * 3 as 'c)
         batchDf.write.bucketBy(7, "a", 
"b").mode(SaveMode.Overwrite).saveAsTable(tbl1Name)
   
         val batchDf2 = 1.to(1000).toDF("value")
           .select('value as 'a, 'value * 3 as 'b, 'value * 4 as 'c)
         batchDf2.write.bucketBy(7, "a", 
"b").mode(SaveMode.Overwrite).saveAsTable(tbl2Name)
   
         sql(s"DESCRIBE EXTENDED $tbl1Name").show(numRows = 21, truncate = 
false)
         sql(s"DESCRIBE EXTENDED $tbl2Name").show(numRows = 21, truncate = 
false)
   
         val input1 = spark.readStream.table(tbl1Name)
         val input2 = spark.readStream.table(tbl2Name)
   
         val df1 = input1.toDF
         val df2 = input2.toDF // .repartition('b)
         val joined = df1.join(df2, Seq("a", "b")).select('a)
   
         testStream(joined)(
           // CheckAnswer(1.to(1000): _*),
           ProcessAllAvailable(),
           Execute { query =>
             // Verify the query plan
             def partitionExpressionsColumns(expressions: Seq[Expression]): 
Seq[String] = {
               expressions.flatMap {
                 case ref: AttributeReference => Some(ref.name)
               }
             }
   
             val numPartitions = 
spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS)
   
             assert(query.lastExecution.executedPlan.collect {
               case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _,
               ShuffleExchangeExec(opA: HashPartitioning, _, _),
               ShuffleExchangeExec(opB: HashPartitioning, _, _))
                 if partitionExpressionsColumns(opA.expressions) === Seq("a", 
"b")
                   && partitionExpressionsColumns(opB.expressions) === Seq("a", 
"b")
                   && opA.numPartitions == numPartitions && opB.numPartitions 
== numPartitions => j
             }.size == 1)
           })
       }
     }
   
   ```
   
   If we want to test with SPARK-35703, we may need to have a source supporting 
bucket scan on streaming. I'm not sure we have it in built-in source.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to