Github user icexelloss commented on a diff in the pull request:
https://github.com/apache/spark/pull/22104#discussion_r210390399
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
---
@@ -117,15 +117,16 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF],
output: Seq[Attribute], chil
}
}.toArray
}.toArray
- val projection = newMutableProjection(allInputs, child.output)
+ val projection = UnsafeProjection.create(allInputs, child.output)
val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
StructField(s"_$i", dt)
})
// Add rows to queue to join later with the result.
val projectedRowIter = iter.map { inputRow =>
- queue.add(inputRow.asInstanceOf[UnsafeRow])
- projection(inputRow)
+ val unsafeRow = projection(inputRow)
+ queue.add(unsafeRow.asInstanceOf[UnsafeRow])
--- End diff --
This is probably another bug I found in testing this - If the input node to
EvalPythonExec doesn't produce UnsafeRow, and cast here will fail.
I found this in testing when I pass in an test data source scan node, which
produces GeneralInternalRow, will throw exception here.
I am happy to submit this as a separate patch if people think it's
necessary
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]