Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21546#discussion_r196277485
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
    @@ -183,34 +182,131 @@ private[sql] object ArrowConverters {
       }
     
       /**
    -   * Convert a byte array to an ArrowRecordBatch.
    +   * Load a serialized ArrowRecordBatch.
        */
    -  private[arrow] def byteArrayToBatch(
    +  private[arrow] def loadBatch(
           batchBytes: Array[Byte],
           allocator: BufferAllocator): ArrowRecordBatch = {
    -    val in = new ByteArrayReadableSeekableByteChannel(batchBytes)
    -    val reader = new ArrowFileReader(in, allocator)
    -
    -    // Read a batch from a byte stream, ensure the reader is closed
    -    Utils.tryWithSafeFinally {
    -      val root = reader.getVectorSchemaRoot  // throws IOException
    -      val unloader = new VectorUnloader(root)
    -      reader.loadNextBatch()  // throws IOException
    -      unloader.getRecordBatch
    -    } {
    -      reader.close()
    -    }
    +    val in = new ByteArrayInputStream(batchBytes)
    +    MessageSerializer.deserializeMessageBatch(new 
ReadChannel(Channels.newChannel(in)), allocator)
    +      .asInstanceOf[ArrowRecordBatch]  // throws IOException
       }
     
    +  /**
    +   * Create a DataFrame from a JavaRDD of serialized ArrowRecordBatches.
    +   */
       private[sql] def toDataFrame(
    -      payloadRDD: JavaRDD[Array[Byte]],
    +      arrowBatchRDD: JavaRDD[Array[Byte]],
           schemaString: String,
           sqlContext: SQLContext): DataFrame = {
    -    val rdd = payloadRDD.rdd.mapPartitions { iter =>
    +    val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
    +    val timeZoneId = sqlContext.sessionState.conf.sessionLocalTimeZone
    +    val rdd = arrowBatchRDD.rdd.mapPartitions { iter =>
           val context = TaskContext.get()
    -      ArrowConverters.fromPayloadIterator(iter.map(new ArrowPayload(_)), 
context)
    +      ArrowConverters.fromBatchIterator(iter, schema, timeZoneId, context)
         }
    -    val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
         sqlContext.internalCreateDataFrame(rdd, schema)
       }
    +
    +  /**
    +   * Read a file as an Arrow stream and return an RDD of serialized 
ArrowRecordBatches.
    +   */
    +  private[sql] def readArrowStreamFromFile(sqlContext: SQLContext, 
filename: String):
    +  JavaRDD[Array[Byte]] = {
    +    val fileStream = new FileInputStream(filename)
    +    try {
    +      // Create array so that we can safely close the file
    +      val batches = getBatchesFromStream(fileStream.getChannel).toArray
    +      // Parallelize the record batches to create an RDD
    +      JavaRDD.fromRDD(sqlContext.sparkContext.parallelize(batches, 
batches.length))
    +    } finally {
    +      fileStream.close()
    +    }
    +  }
    +
    +  /**
    +   * Read an Arrow stream input and return an iterator of serialized 
ArrowRecordBatches.
    +   */
    +  private[sql] def getBatchesFromStream(in: SeekableByteChannel): 
Iterator[Array[Byte]] = {
    +
    +    // TODO: simplify in super class
    +    class RecordBatchMessageReader(inputChannel: SeekableByteChannel) {
    --- End diff --
    
    Sure. The main reason that I am not sure about this code is that the code 
here breaks encapsulation.
    
    If I understand correctly, Arrow reader only supports reading record batch 
from Channel to Arrow memory, in order to read record batch from Channel to 
on-heap memory directly, we need to subclass `MessageChannelReader`, overwrite 
`readNextMessage` to load the the metadata and body of record batch.
    
    Now the main point that I feel not comfortable with this approach:
    The subclass changes the behavior of `readNextMessage` to load  both 
metadata and body of a record batch, where in the parent class it only loads 
meta of a record batch. And I think this is the contract of the interface too 
so this feels a bit hacky.
    
    I am not saying I am totally against this for performance reasons, but 
considering the code path already involves writing data to disk (so avoid one 
memory copy won't necessary get us much) and is one of the less frequent 
operations (pandas DataFrame -> spark DataFrame), I am not sure it's worth it, 
that's why I suggest resolving this separately so not to block this PR.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to