viirya commented on a change in pull request #24818: [SPARK-27971[SQL][R] 
MapPartitionsInRWithArrowExec.evaluate shouldn't eagerly read the first batch
URL: https://github.com/apache/spark/pull/24818#discussion_r291811780
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
 ##########
 @@ -243,28 +243,11 @@ case class MapPartitionsInRWithArrowExec(
       // binary in a batch due to the limitation of R API. See also ARROW-4512.
       val columnarBatchIter = runner.compute(batchIter, -1)
       val outputProject = UnsafeProjection.create(output, output)
-      new Iterator[InternalRow] {
-
-        private var currentIter = if (columnarBatchIter.hasNext) {
-          val batch = columnarBatchIter.next()
-          val actualDataTypes = (0 until batch.numCols()).map(i => 
batch.column(i).dataType())
-          assert(outputTypes == actualDataTypes, "Invalid schema from 
dapply(): " +
-            s"expected ${outputTypes.mkString(", ")}, got 
${actualDataTypes.mkString(", ")}")
-          batch.rowIterator.asScala
-        } else {
-          Iterator.empty
-        }
-
-        override def hasNext: Boolean = currentIter.hasNext || {
-          if (columnarBatchIter.hasNext) {
-            currentIter = columnarBatchIter.next().rowIterator.asScala
-            hasNext
-          } else {
-            false
-          }
-        }
-
-        override def next(): InternalRow = currentIter.next()
+      columnarBatchIter.flatMap { batch =>
+        val actualDataTypes = (0 until batch.numCols()).map(i => 
batch.column(i).dataType())
+        assert(outputTypes == actualDataTypes, "Invalid schema from dapply(): 
" +
+          s"expected ${outputTypes.mkString(", ")}, got 
${actualDataTypes.mkString(", ")}")
+        batch.rowIterator.asScala
 
 Review comment:
   My guess is that it was wanting to check the data type at the first batch 
only. I think this fix is right.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to