wangyum commented on pull request #28032: URL: https://github.com/apache/spark/pull/28032#issuecomment-855549147
@cloud-fan @HyukjinKwon Could we review https://github.com/apache/spark/pull/32781 first? This pr need to add support datasource v2 and the test is not very robust. I plan to update this rule to something like this: ```scala object CoalesceOutputFiles extends Rule[LogicalPlan] { private def resolveColumnNames( columnNames: Seq[String], outputAttrs: Seq[Attribute]): Seq[NamedExpression] = { columnNames.map { c => outputAttrs.resolve(c :: Nil, conf.resolver). getOrElse(throw new AnalysisException(s"Cannot resolve column name $c among (" + s"${outputAttrs.map(_.name).mkString(",")}).")) } } private def coalesceOutputFiles( dataWritingCommand: DataWritingCommand, bucketSpec: Option[BucketSpec], partitionColumns: Seq[Expression]): LogicalPlan = { val query = dataWritingCommand.query (bucketSpec, partitionColumns) match { case (None, Nil) => query match { case _: CoalescePartitions => dataWritingCommand case _ => dataWritingCommand.withNewChildInternal(CoalescePartitions(Nil, query)) } case (None, partExps @ _ +: _) => query match { case RepartitionByExpression(partExpressions, _, _) if partExpressions == partExps => dataWritingCommand case _ => dataWritingCommand.withNewChildInternal(RepartitionByExpression(partExps, query, None)) } case (Some(bucket), _) if bucket.sortColumnNames.nonEmpty => val bucketExps = resolveColumnNames(bucket.bucketColumnNames, query.output) val sortExps = resolveColumnNames(bucket.sortColumnNames, query.output) .map(SortOrder(_, Ascending)) query match { case Sort(order, false, RepartitionByExpression(partExpr, _, Some(bucket.numBuckets))) if order == sortExps && partExpr == bucketExps => dataWritingCommand case _ => dataWritingCommand.withNewChildInternal( Sort(sortExps, false, RepartitionByExpression(bucketExps, query, Some(bucket.numBuckets)))) } case (Some(bucket), _) if bucket.sortColumnNames.isEmpty => val bucketExps = resolveColumnNames(bucket.bucketColumnNames, query.output) query match { case RepartitionByExpression(partExpr, _, Some(bucket.numBuckets)) if partExpr == bucketExps => dataWritingCommand case _ => dataWritingCommand.withNewChildInternal( RepartitionByExpression(bucketExps, query, Some(bucket.numBuckets))) } } } override def apply(plan: LogicalPlan): LogicalPlan = { insertRepartition(plan) } private def insertRepartition(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case c @ CreateDataSourceTableAsSelectCommand(table, _, query, _) if query.resolved => val dynamicPartExps = resolveColumnNames(table.partitionColumnNames, query.output) coalesceOutputFiles(c, table.bucketSpec, dynamicPartExps) case i @ InsertIntoHadoopFsRelationCommand( _, staticParts, _, partColumns, bucket, _, _, query, _, _, _, _) if query.resolved => val dynamicPartExps = partColumns.filterNot(p => staticParts.exists(s => conf.resolver(p.name, s._1))) coalesceOutputFiles(i, bucket, dynamicPartExps) // case c @ CreateTableAsSelect(table, _, _, query, _, _, _) if query.resolved => c case a @ AppendData(table, query, _, _, _) => query match { case _: CoalescePartitions => a case _ => a.withNewQuery(CoalescePartitions(Nil, query)) } case a @ ReplaceTableAsSelect(_, _, partitioning, query, _, _, _) => query match { case _: CoalescePartitions => a case _ => a.withNewChildInternal(CoalescePartitions(Nil, query)) } } } ``` Test: ```scala sql( """ |CREATE TABLE t1 using parquet |CLUSTERED BY (key) SORTED BY(value) into 10 buckets |AS (SELECT id AS key, id AS value FROM range(5)) |""".stripMargin).queryExecution.analyzed match { case CreateDataSourceTableAsSelectCommand(_, _, Sort(_, false, RepartitionByExpression(_, _, Some(10))), _) => case other => fail( s""" |== FAIL: Plans do not match === |${rewriteNameFromAttrNullability(other).treeString} """.stripMargin) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
