hvanhovell commented on code in PR #38468: URL: https://github.com/apache/spark/pull/38468#discussion_r1014269647
########## connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala: ########## @@ -117,7 +126,70 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte responseObserver.onNext(response.build()) } - responseObserver.onNext(sendMetricsToResponse(clientId, rows)) + responseObserver.onNext(sendMetricsToResponse(clientId, dataframe)) + responseObserver.onCompleted() + } + + def processRowsAsArrowBatches(clientId: String, dataframe: DataFrame): Unit = { + val spark = dataframe.sparkSession + val schema = dataframe.schema + // TODO: control the batch size instead of max records + val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch + val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone + + val rows = dataframe.queryExecution.executedPlan.execute() + var numBatches = 0L + + if (rows.getNumPartitions > 0) { + val batches = rows.mapPartitionsInternal { iter => + ArrowConverters + .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId) + } + + val obj = new Object + + val processPartition = (iter: Iterator[(Array[Byte], Long, Long)]) => iter.toArray Review Comment: This breaks sorted results. A higher partition can complete earlier than lower ones thus breaking the order. That is why I the snippet I posted buffered the partitions in the handler, while the main thread scanned over them 1 by 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org