HyukjinKwon commented on a change in pull request #24795: [SPARK-27945][SQL]
Minimal changes to support columnar processing
URL: https://github.com/apache/spark/pull/24795#discussion_r300329454
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
##########
@@ -840,34 +874,55 @@ case class CollapseCodegenStages(
/**
* Inserts an InputAdapter on top of those that do not support codegen.
*/
- private def insertInputAdapter(plan: SparkPlan): SparkPlan = plan match {
- case p if !supportCodegen(p) =>
- // collapse them recursively
- InputAdapter(insertWholeStageCodegen(p))
- case j: SortMergeJoinExec =>
- // The children of SortMergeJoin should do codegen separately.
- j.withNewChildren(j.children.map(child =>
InputAdapter(insertWholeStageCodegen(child))))
- case p =>
- p.withNewChildren(p.children.map(insertInputAdapter))
+ private def insertInputAdapter(plan: SparkPlan, isColumnarInput: Boolean):
SparkPlan = {
+ val isColumnar = adjustColumnar(plan, isColumnarInput)
+ plan match {
+ case p if !supportCodegen(p) =>
+ // collapse them recursively
+ InputAdapter(insertWholeStageCodegen(p, isColumnar), isColumnar)
+ case j: SortMergeJoinExec =>
+ // The children of SortMergeJoin should do codegen separately.
+ j.withNewChildren(j.children.map(
+ child => InputAdapter(insertWholeStageCodegen(child, isColumnar),
isColumnar)))
+ case p =>
+ p.withNewChildren(p.children.map(insertInputAdapter(_, isColumnar)))
+ }
}
/**
* Inserts a WholeStageCodegen on top of those that support codegen.
*/
- private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = plan match
{
- // For operators that will output domain object, do not insert
WholeStageCodegen for it as
- // domain object can not be written into unsafe row.
- case plan if plan.output.length == 1 &&
plan.output.head.dataType.isInstanceOf[ObjectType] =>
- plan.withNewChildren(plan.children.map(insertWholeStageCodegen))
- case plan: CodegenSupport if supportCodegen(plan) =>
-
WholeStageCodegenExec(insertInputAdapter(plan))(codegenStageCounter.incrementAndGet())
- case other =>
- other.withNewChildren(other.children.map(insertWholeStageCodegen))
+ private def insertWholeStageCodegen(plan: SparkPlan, isColumnarInput:
Boolean): SparkPlan = {
+ val isColumnar = adjustColumnar(plan, isColumnarInput)
+ plan match {
+ // For operators that will output domain object, do not insert
WholeStageCodegen for it as
+ // domain object can not be written into unsafe row.
+ case plan if plan.output.length == 1 &&
plan.output.head.dataType.isInstanceOf[ObjectType] =>
+ plan.withNewChildren(plan.children.map(insertWholeStageCodegen(_,
isColumnar)))
+ case plan: CodegenSupport if supportCodegen(plan) =>
+ WholeStageCodegenExec(
+ insertInputAdapter(plan,
isColumnar))(codegenStageCounter.incrementAndGet())
+ case other =>
+ other.withNewChildren(other.children.map(insertWholeStageCodegen(_,
isColumnar)))
+ }
+ }
+
+ /**
+ * Depending on the stage in the plan and if we currently are columnar or not
+ * return if we are still columnar or not.
+ */
+ private def adjustColumnar(plan: SparkPlan, isColumnar: Boolean): Boolean =
+ // We are walking up the plan, so columnar starts when we transition to
rows
+ // and ends when we transition to columns
+ plan match {
Review comment:
not a biggie but indentation here looks weird.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]