Github user icexelloss commented on a diff in the pull request:
https://github.com/apache/spark/pull/22104#discussion_r212340459
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
---
@@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF],
output: Seq[Attribute], chil
}
}.toArray
}.toArray
- val projection = newMutableProjection(allInputs, child.output)
+
+ // Project input rows to unsafe row so we can put it in the row queue
+ val unsafeProjection = UnsafeProjection.create(child.output,
child.output)
--- End diff --
@cloud-fan Sorry, I don't think I am being very clear...
> If the data source does not produce UnsafeRow, Spark will make sure there
will be a project
> above it to produce UnsafeRow
I don't think this is happening for datasource V2 right now:
(Code running in pyspark test)
```
datasource_v2_df = self.spark.read \
.format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \
.load()
result = datasource_v2_df.withColumn('x', udf(lambda x: x,
'int')(datasource_v2_df['i']))
result.show()
```
The code above fails with:
```
Caused by: java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to
org.apache.spark.sql.catalyst.expressions.UnsafeRow
at
org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1$$anonfun$5.apply(EvalPythonExec.scala:127)
at
org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1$$anonfun$5.apply(EvalPythonExec.scala:126)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at
scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1074)
```
I think this is an issue with DataSourceV2 that probably should be
addressed in another PR (DataSourceV1 works fine). @cloud-fan WDYT?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]