Github user BryanCutler commented on a diff in the pull request:
https://github.com/apache/spark/pull/21546#discussion_r199615244
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
---
@@ -38,70 +39,75 @@ import org.apache.spark.util.Utils
/**
- * Store Arrow data in a form that can be serialized by Spark and served
to a Python process.
+ * Writes serialized ArrowRecordBatches to a DataOutputStream in the Arrow
stream format.
*/
-private[sql] class ArrowPayload private[sql] (payload: Array[Byte])
extends Serializable {
+private[sql] class ArrowBatchStreamWriter(
+ schema: StructType,
+ out: OutputStream,
+ timeZoneId: String) {
- /**
- * Convert the ArrowPayload to an ArrowRecordBatch.
- */
- def loadBatch(allocator: BufferAllocator): ArrowRecordBatch = {
- ArrowConverters.byteArrayToBatch(payload, allocator)
- }
+ val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
+ val writeChannel = new WriteChannel(Channels.newChannel(out))
+
+ // Write the Arrow schema first, before batches
+ MessageSerializer.serialize(writeChannel, arrowSchema)
/**
- * Get the ArrowPayload as a type that can be served to Python.
+ * Consume iterator to write each serialized ArrowRecordBatch to the
stream.
*/
- def asPythonSerializable: Array[Byte] = payload
-}
-
-/**
- * Iterator interface to iterate over Arrow record batches and return rows
- */
-private[sql] trait ArrowRowIterator extends Iterator[InternalRow] {
+ def writeBatches(arrowBatchIter: Iterator[Array[Byte]]): Unit = {
+ arrowBatchIter.foreach { batchBytes =>
+ writeChannel.write(batchBytes)
+ }
+ }
/**
- * Return the schema loaded from the Arrow record batch being iterated
over
+ * End the Arrow stream, does not close output stream.
*/
- def schema: StructType
+ def end(): Unit = {
+ // Write End of Stream
--- End diff --
Since right now it's just writing a `0`, I think it's useful to comment
that this means the EOS code. I have a TODO here to wrap this call in an Arrow
function that will be more clear, then we wouldn't need a comment.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]