Github user sethah commented on a diff in the pull request:
https://github.com/apache/spark/pull/21546#discussion_r199499070
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
---
@@ -38,70 +39,75 @@ import org.apache.spark.util.Utils
/**
- * Store Arrow data in a form that can be serialized by Spark and served
to a Python process.
+ * Writes serialized ArrowRecordBatches to a DataOutputStream in the Arrow
stream format.
*/
-private[sql] class ArrowPayload private[sql] (payload: Array[Byte])
extends Serializable {
+private[sql] class ArrowBatchStreamWriter(
+ schema: StructType,
+ out: OutputStream,
+ timeZoneId: String) {
- /**
- * Convert the ArrowPayload to an ArrowRecordBatch.
- */
- def loadBatch(allocator: BufferAllocator): ArrowRecordBatch = {
- ArrowConverters.byteArrayToBatch(payload, allocator)
- }
+ val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
+ val writeChannel = new WriteChannel(Channels.newChannel(out))
+
+ // Write the Arrow schema first, before batches
+ MessageSerializer.serialize(writeChannel, arrowSchema)
/**
- * Get the ArrowPayload as a type that can be served to Python.
+ * Consume iterator to write each serialized ArrowRecordBatch to the
stream.
*/
- def asPythonSerializable: Array[Byte] = payload
-}
-
-/**
- * Iterator interface to iterate over Arrow record batches and return rows
- */
-private[sql] trait ArrowRowIterator extends Iterator[InternalRow] {
+ def writeBatches(arrowBatchIter: Iterator[Array[Byte]]): Unit = {
+ arrowBatchIter.foreach { batchBytes =>
--- End diff --
nit: `arrowBatchIter.foreach(writeChannel.write)`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]