hvanhovell commented on code in PR #38468:
URL: https://github.com/apache/spark/pull/38468#discussion_r1014269647


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -117,7 +126,70 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[Response]) exte
       responseObserver.onNext(response.build())
     }
 
-    responseObserver.onNext(sendMetricsToResponse(clientId, rows))
+    responseObserver.onNext(sendMetricsToResponse(clientId, dataframe))
+    responseObserver.onCompleted()
+  }
+
+  def processRowsAsArrowBatches(clientId: String, dataframe: DataFrame): Unit 
= {
+    val spark = dataframe.sparkSession
+    val schema = dataframe.schema
+    // TODO: control the batch size instead of max records
+    val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
+    val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
+
+    val rows = dataframe.queryExecution.executedPlan.execute()
+    var numBatches = 0L
+
+    if (rows.getNumPartitions > 0) {
+      val batches = rows.mapPartitionsInternal { iter =>
+        ArrowConverters
+          .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId)
+      }
+
+      val obj = new Object
+
+      val processPartition = (iter: Iterator[(Array[Byte], Long, Long)]) => 
iter.toArray

Review Comment:
   This breaks sorted results. A higher partition can complete earlier than 
lower ones thus breaking the order. That is why I the snippet I posted buffered 
the partitions in the handler, while the main thread scanned over them 1 by 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to