Github user rdblue commented on a diff in the pull request:
https://github.com/apache/spark/pull/22206#discussion_r212489210
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
---
@@ -130,10 +133,22 @@ object DataSourceV2Strategy extends Strategy {
config)
val filterCondition = postScanFilters.reduceLeftOption(And)
- val withFilter = filterCondition.map(FilterExec(_,
scan)).getOrElse(scan)
+
+ val withFilter = if (filterCondition.exists(hasScalarPythonUDF)) {
+ // add a projection before FilterExec to ensure that the rows are
converted to unsafe
+ val filterExpr = filterCondition.get
+ FilterExec(filterExpr, ProjectExec(filterExpr.references.toSeq,
scan))
+ } else {
+ filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
+ }
// always add the projection, which will produce unsafe rows
required by some operators
- ProjectExec(project, withFilter) :: Nil
+ if (project.exists(hasScalarPythonUDF)) {
+ val references = project.map(_.references).reduce(_ ++ _).toSeq
+ ProjectExec(project, ProjectExec(references, withFilter)) :: Nil
--- End diff --
That one was only added if there was a filter and if that filter ran a UDF.
This will add an unnecessary project if both the filter and the project have
python UDFs, but I thought that was probably okay. I can add a boolean to
signal if the filter caused one to be added already if you think it's worth it.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]