This is an automated email from the ASF dual-hosted git repository. meng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4d770db [SPARK-27968] ArrowEvalPythonExec.evaluate shouldn't eagerly read the first row 4d770db is described below commit 4d770db0eb252c56072f093eae318bad3d20b8d7 Author: Xiangrui Meng <m...@databricks.com> AuthorDate: Thu Jun 6 15:45:44 2019 -0700 [SPARK-27968] ArrowEvalPythonExec.evaluate shouldn't eagerly read the first row ## What changes were proposed in this pull request? Issued fixed in https://github.com/apache/spark/pull/24734 but that PR might takes longer to merge. ## How was this patch tested? It should pass existing unit tests. Closes #24816 from mengxr/SPARK-27968. Authored-by: Xiangrui Meng <m...@databricks.com> Signed-off-by: Xiangrui Meng <m...@databricks.com> --- .../sql/execution/python/ArrowEvalPythonExec.scala | 27 ++++------------------ 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala index 000ae97..73a43af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala @@ -86,28 +86,11 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute] sessionLocalTimeZone, pythonRunnerConf).compute(batchIter, context.partitionId(), context) - new Iterator[InternalRow] { - - private var currentIter = if (columnarBatchIter.hasNext) { - val batch = columnarBatchIter.next() - val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType()) - assert(outputTypes == actualDataTypes, "Invalid schema from pandas_udf: " + - s"expected ${outputTypes.mkString(", ")}, got ${actualDataTypes.mkString(", ")}") - batch.rowIterator.asScala - } else { - Iterator.empty - } - - override def hasNext: Boolean = currentIter.hasNext || { - if (columnarBatchIter.hasNext) { - currentIter = columnarBatchIter.next().rowIterator.asScala - hasNext - } else { - false - } - } - - override def next(): InternalRow = currentIter.next() + columnarBatchIter.flatMap { batch => + val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType()) + assert(outputTypes == actualDataTypes, "Invalid schema from pandas_udf: " + + s"expected ${outputTypes.mkString(", ")}, got ${actualDataTypes.mkString(", ")}") + batch.rowIterator.asScala } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org