[GitHub] spark pull request #20485: [SPARK-23315][SQL] failed to get output from cano...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20485 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20485: [SPARK-23315][SQL] failed to get output from cano...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20485#discussion_r166436103 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala --- @@ -81,33 +81,44 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHel // TODO: add more push down rules. -pushDownRequiredColumns(filterPushed, filterPushed.outputSet) +val columnPruned = pushDownRequiredColumns(filterPushed, filterPushed.outputSet) // After column pruning, we may have redundant PROJECT nodes in the query plan, remove them. -RemoveRedundantProject(filterPushed) +RemoveRedundantProject(columnPruned) } // TODO: nested fields pruning - private def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent: AttributeSet): Unit = { + private def pushDownRequiredColumns( + plan: LogicalPlan, requiredByParent: AttributeSet): LogicalPlan = { plan match { - case Project(projectList, child) => + case p @ Project(projectList, child) => val required = projectList.flatMap(_.references) -pushDownRequiredColumns(child, AttributeSet(required)) +p.copy(child = pushDownRequiredColumns(child, AttributeSet(required))) - case Filter(condition, child) => + case f @ Filter(condition, child) => val required = requiredByParent ++ condition.references -pushDownRequiredColumns(child, required) +f.copy(child = pushDownRequiredColumns(child, required)) case relation: DataSourceV2Relation => relation.reader match { case reader: SupportsPushDownRequiredColumns => + // TODO: Enable the below assert after we make `DataSourceV2Relation` immutable. Fow now --- End diff -- Typo: Fow --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20485: [SPARK-23315][SQL] failed to get output from cano...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20485#discussion_r165579903 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala --- @@ -81,33 +81,44 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHel // TODO: add more push down rules. -pushDownRequiredColumns(filterPushed, filterPushed.outputSet) +val columnPruned = pushDownRequiredColumns(filterPushed, filterPushed.outputSet) // After column pruning, we may have redundant PROJECT nodes in the query plan, remove them. -RemoveRedundantProject(filterPushed) +RemoveRedundantProject(columnPruned) } // TODO: nested fields pruning - private def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent: AttributeSet): Unit = { + private def pushDownRequiredColumns( + plan: LogicalPlan, requiredByParent: AttributeSet): LogicalPlan = { plan match { - case Project(projectList, child) => + case p @ Project(projectList, child) => val required = projectList.flatMap(_.references) -pushDownRequiredColumns(child, AttributeSet(required)) +p.copy(child = pushDownRequiredColumns(child, AttributeSet(required))) - case Filter(condition, child) => + case f @ Filter(condition, child) => val required = requiredByParent ++ condition.references -pushDownRequiredColumns(child, required) +f.copy(child = pushDownRequiredColumns(child, required)) case relation: DataSourceV2Relation => relation.reader match { case reader: SupportsPushDownRequiredColumns => + // TODO: Enable the below assert after we make `DataSourceV2Relation` immutable. Fow now + // it's possible that the mutable reader being updated by someone else, and we need to + // always call `reader.pruneColumns` here to correct it. + // assert(relation.output.toStructType == reader.readSchema(), + // "Schema of data source reader does not match the relation plan.") + val requiredColumns = relation.output.filter(requiredByParent.contains) reader.pruneColumns(requiredColumns.toStructType) -case _ => + val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap + val newOutput = reader.readSchema().map(_.name).map(nameToAttr) + relation.copy(output = newOutput) --- End diff -- @rdblue This is the bug I mentioned before. Finally I figured out a way to fix it surgically: always run column pruning even no column needs to be pruned. This helps us correct the required schema of the reader, if it's updated by someone else. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20485: [SPARK-23315][SQL] failed to get output from cano...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20485#discussion_r165578555 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala --- @@ -99,15 +100,22 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHel case relation: DataSourceV2Relation => relation.reader match { case reader: SupportsPushDownRequiredColumns => + // TODO: Enable the below assert after we make `DataSourceV2Relation` immutable. Fow now + // it's possible that the mutable reader being updated by someone else, and we need to + // always call `reader.pruneColumns` here to correct it. + // assert(relation.output.toStructType == reader.readSchema(), + // "Schema of data source reader does not match the relation plan.") + val requiredColumns = relation.output.filter(requiredByParent.contains) reader.pruneColumns(requiredColumns.toStructType) + relation.copy(output = requiredColumns) --- End diff -- @rdblue This is the bug I mentioned before. Finally I figured out a way to fix it surgically: always run column pruning even no column needs to be pruned. This helps us correct the required schema of the reader, if it's updated by someone else. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20485: [SPARK-23315][SQL] failed to get output from cano...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/20485 [SPARK-23315][SQL] failed to get output from canonicalized data source v2 related plans ## What changes were proposed in this pull request? `DataSourceV2Relation` keeps a `fullOutput` and resolves the real output on demand by column name lookup. This will be broken after we canonicalize the plan, because all attribute names become "None". To fix this, `DataSourceV2Relation` should just keep `output`, and update the `output` when doing column pruning. ## How was this patch tested? a new test case You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark canonicalize Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20485.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20485 commit 75950a1725f01c31764ac31d16acd6e2078956c6 Author: Wenchen Fan Date: 2018-02-02T07:53:07Z failed to get output from canonicalized data source v2 related plans --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org