wangyum commented on pull request #28032:
URL: https://github.com/apache/spark/pull/28032#issuecomment-855549147


   @cloud-fan @HyukjinKwon Could we review 
https://github.com/apache/spark/pull/32781 first? This pr need to add support 
datasource v2 and the test is not very robust. I plan to update this rule to 
something like this:
   ```scala
   object CoalesceOutputFiles extends Rule[LogicalPlan] {
   
     private def resolveColumnNames(
         columnNames: Seq[String], outputAttrs: Seq[Attribute]): 
Seq[NamedExpression] = {
       columnNames.map { c =>
         outputAttrs.resolve(c :: Nil, conf.resolver).
           getOrElse(throw new AnalysisException(s"Cannot resolve column name 
$c among (" +
             s"${outputAttrs.map(_.name).mkString(",")})."))
       }
     }
   
     private def coalesceOutputFiles(
         dataWritingCommand: DataWritingCommand,
         bucketSpec: Option[BucketSpec],
         partitionColumns: Seq[Expression]): LogicalPlan = {
       val query = dataWritingCommand.query
       (bucketSpec, partitionColumns) match {
         case (None, Nil) =>
           query match {
             case _: CoalescePartitions =>
               dataWritingCommand
             case _ =>
               dataWritingCommand.withNewChildInternal(CoalescePartitions(Nil, 
query))
           }
   
         case (None, partExps @ _ +: _) =>
           query match {
             case RepartitionByExpression(partExpressions, _, _) if 
partExpressions == partExps =>
               dataWritingCommand
             case _ =>
               
dataWritingCommand.withNewChildInternal(RepartitionByExpression(partExps, 
query, None))
           }
   
         case (Some(bucket), _) if bucket.sortColumnNames.nonEmpty =>
           val bucketExps = resolveColumnNames(bucket.bucketColumnNames, 
query.output)
           val sortExps = resolveColumnNames(bucket.sortColumnNames, 
query.output)
             .map(SortOrder(_, Ascending))
           query match {
             case Sort(order, false, RepartitionByExpression(partExpr, _, 
Some(bucket.numBuckets)))
               if order == sortExps && partExpr == bucketExps =>
               dataWritingCommand
             case _ =>
               dataWritingCommand.withNewChildInternal(
                 Sort(sortExps, false,
                   RepartitionByExpression(bucketExps, query, 
Some(bucket.numBuckets))))
           }
   
         case (Some(bucket), _) if bucket.sortColumnNames.isEmpty =>
           val bucketExps = resolveColumnNames(bucket.bucketColumnNames, 
query.output)
           query match {
             case RepartitionByExpression(partExpr, _, Some(bucket.numBuckets))
               if partExpr == bucketExps =>
               dataWritingCommand
             case _ =>
               dataWritingCommand.withNewChildInternal(
                 RepartitionByExpression(bucketExps, query, 
Some(bucket.numBuckets)))
           }
       }
     }
   
   
     override def apply(plan: LogicalPlan): LogicalPlan = {
       insertRepartition(plan)
     }
   
     private def insertRepartition(plan: LogicalPlan): LogicalPlan = plan 
resolveOperators {
       case c @ CreateDataSourceTableAsSelectCommand(table, _, query, _) if 
query.resolved =>
         val dynamicPartExps = resolveColumnNames(table.partitionColumnNames, 
query.output)
         coalesceOutputFiles(c, table.bucketSpec, dynamicPartExps)
   
       case i @ InsertIntoHadoopFsRelationCommand(
           _, staticParts, _, partColumns, bucket, _, _, query, _, _, _, _) if 
query.resolved =>
         val dynamicPartExps =
           partColumns.filterNot(p => staticParts.exists(s => 
conf.resolver(p.name, s._1)))
         coalesceOutputFiles(i, bucket, dynamicPartExps)
   
       // case c @ CreateTableAsSelect(table, _, _, query, _, _, _) if 
query.resolved => c
   
       case a @ AppendData(table, query, _, _, _) =>
         query match {
           case _: CoalescePartitions =>
             a
           case _ =>
             a.withNewQuery(CoalescePartitions(Nil, query))
         }
   
       case a @ ReplaceTableAsSelect(_, _, partitioning, query, _, _, _) =>
         query match {
           case _: CoalescePartitions =>
             a
           case _ =>
             a.withNewChildInternal(CoalescePartitions(Nil, query))
         }
     }
   }
   ```
   
   Test:
   ```scala
         sql(
           """
             |CREATE TABLE t1 using parquet
             |CLUSTERED BY (key) SORTED BY(value) into 10 buckets
             |AS (SELECT id AS key, id AS value FROM range(5))
             |""".stripMargin).queryExecution.analyzed match {
           case CreateDataSourceTableAsSelectCommand(_, _,
               Sort(_, false, RepartitionByExpression(_, _, Some(10))), _) =>
           case other =>
             fail(
               s"""
                  |== FAIL: Plans do not match ===
                  |${rewriteNameFromAttrNullability(other).treeString}
               """.stripMargin)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to