BryanCutler commented on a change in pull request #24650: [SPARK-27778][PYTHON] 
Fix toPandas conversion of empty DataFrame with Arrow enabled
URL: https://github.com/apache/spark/pull/24650#discussion_r286271216
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
 ##########
 @@ -3306,27 +3303,24 @@ class Dataset[T] private[sql](
               partitionBatchIndex => batchOrder.append((index, 
partitionBatchIndex))
             }
           }
-          partitionCount += 1
-
-          // After last batch, end the stream and write batch order indices
-          if (partitionCount == numPartitions) {
-            batchWriter.end()
-            out.writeInt(batchOrder.length)
-            // Sort by (index of partition, batch index in that partition) 
tuple to get the
-            // overall_batch_index from 0 to N-1 batches, which can be used to 
put the
-            // transferred batches in the correct order
-            batchOrder.zipWithIndex.sortBy(_._1).foreach { case (_, 
overallBatchIndex) =>
-              out.writeInt(overallBatchIndex)
-            }
-            out.flush()
-          }
         }
 
+        val arrowBatchRdd = toArrowBatchRdd(plan)
         sparkSession.sparkContext.runJob(
           arrowBatchRdd,
-          (ctx: TaskContext, it: Iterator[Array[Byte]]) => it.toArray,
-          0 until numPartitions,
-          handlePartitionBatches)
+          (it: Iterator[Array[Byte]]) => it.toArray,
+          handlePartitionBatches _)
+
+        // After processing all partitions, end the stream and write batch 
order indices
+        batchWriter.end()
+        out.writeInt(batchOrder.length)
+        // Sort by (index of partition, batch index in that partition) tuple 
to get the
+        // overall_batch_index from 0 to N-1 batches, which can be used to put 
the
+        // transferred batches in the correct order
+        batchOrder.zipWithIndex.sortBy(_._1).foreach { case (_, 
overallBatchIndex) =>
+          out.writeInt(overallBatchIndex)
+        }
+        out.flush()
 
 Review comment:
   This `flush` could be removed now since the output stream is closed right 
after this returns, which will flush it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to