cfmcgrady commented on code in PR #4392:
URL: https://github.com/apache/kyuubi/pull/4392#discussion_r1113760018
##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala:
##########
@@ -171,3 +172,39 @@ class ExecuteStatement(
s"__kyuubi_operation_result_format__=$resultFormat",
s"__kyuubi_operation_result_arrow_timestampAsString__=$timestampAsString")
}
+
+class ArrowBasedExecuteStatement(
+ session: Session,
+ override val statement: String,
+ override val shouldRunAsync: Boolean,
+ queryTimeout: Long,
+ incrementalCollect: Boolean)
+ extends ExecuteStatement(session, statement, shouldRunAsync, queryTimeout,
incrementalCollect) {
+
+ override protected def incrementalCollectResult(): Iterator[Any] = {
+
SparkDatasetHelper.toArrowBatchRdd(convertComplexType(result)).toLocalIterator
+ }
+
+ override protected def fullCollectResult(): Array[_] = {
+ SparkDatasetHelper.toArrowBatchRdd(convertComplexType(result)).collect()
+ }
+
+ override protected def takeResult(maxRows: Int): Array[_] = {
+ // this will introduce shuffle and hurt performance
+ val limitedResult = result.limit(maxRows)
+
SparkDatasetHelper.toArrowBatchRdd(convertComplexType(limitedResult)).collect()
+ }
+
+ /**
+ * assign a new execution id for arrow-based operation.
+ */
+ override protected def collectResult(): Unit = {
+ SQLExecution.withNewExecutionId(result.queryExecution, Some("toArrow")) {
Review Comment:
addressed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]