Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/22206#discussion_r212488439
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
---
@@ -130,10 +133,22 @@ object DataSourceV2Strategy extends Strategy {
config)
val filterCondition = postScanFilters.reduceLeftOption(And)
- val withFilter = filterCondition.map(FilterExec(_,
scan)).getOrElse(scan)
+
+ val withFilter = if (filterCondition.exists(hasScalarPythonUDF)) {
+ // add a projection before FilterExec to ensure that the rows are
converted to unsafe
+ val filterExpr = filterCondition.get
+ FilterExec(filterExpr, ProjectExec(filterExpr.references.toSeq,
scan))
+ } else {
+ filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
+ }
// always add the projection, which will produce unsafe rows
required by some operators
- ProjectExec(project, withFilter) :: Nil
+ if (project.exists(hasScalarPythonUDF)) {
+ val references = project.map(_.references).reduce(_ ++ _).toSeq
+ ProjectExec(project, ProjectExec(references, withFilter)) :: Nil
--- End diff --
oh, I see. It is also used to make sure PythonUDF in top Project takes
unsafe row input.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]