hvanhovell commented on code in PR #53452:
URL: https://github.com/apache/spark/pull/53452#discussion_r2612901978


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala:
##########
@@ -296,87 +296,59 @@ private[sql] object ArrowConverters extends Logging {
    * @param context Task Context for Spark
    */
   private[sql] class InternalRowIteratorFromIPCStream(
-      input: Array[Byte],
-      context: TaskContext) extends Iterator[InternalRow] {
-
-    // Keep all the resources we have opened in order, should be closed
-    // in reverse order finally.
-    private val resources = new ArrayBuffer[AutoCloseable]()
+      ipcStreams: Iterator[Array[Byte]],
+      context: TaskContext)
+    extends CloseableIterator[InternalRow] {
 
     // Create an allocator used for all Arrow related memory.
     protected val allocator: BufferAllocator = 
ArrowUtils.rootAllocator.newChildAllocator(
       s"to${this.getClass.getSimpleName}",
       0,
       Long.MaxValue)
-    resources.append(allocator)
 
-    private val reader = try {
-      new ArrowStreamReader(new ByteArrayInputStream(input), allocator)
-    } catch {
-      case e: Exception =>
-        closeAll(resources.toSeq.reverse: _*)
-        throw new IllegalArgumentException(
-          s"Failed to create ArrowStreamReader: ${e.getMessage}", e)
-    }
-    resources.append(reader)
-
-    private val root: VectorSchemaRoot = try {
-      reader.getVectorSchemaRoot
-    } catch {
-      case e: Exception =>
-        closeAll(resources.toSeq.reverse: _*)
-        throw new IllegalArgumentException(
-          s"Failed to read schema from IPC stream: ${e.getMessage}", e)
+    private val reader = {
+      val messages = ipcStreams.map { bytes =>
+        new MessageIterator(new ByteArrayInputStream(bytes), allocator)
+      }
+      new ConcatenatingArrowStreamReader(allocator, messages, destructive = 
true)

Review Comment:
   This is reusing a component that was used in the Spark Connect Scala client. 
It allows us to concatenate multiple IPC streams.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to