pan3793 commented on code in PR #4662:
URL: https://github.com/apache/kyuubi/pull/4662#discussion_r1160386535
##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala:
##########
@@ -68,11 +125,107 @@ object SparkDatasetHelper {
* Fork from Apache Spark-3.3.1
org.apache.spark.sql.catalyst.util.quoteIfNeeded to adapt to
* Spark-3.1.x
*/
- def quoteIfNeeded(part: String): String = {
+ private def quoteIfNeeded(part: String): String = {
if (part.matches("[a-zA-Z0-9_]+") && !part.matches("\\d+")) {
part
} else {
s"`${part.replace("`", "``")}`"
}
}
+
+ private lazy val maxBatchSize: Long = {
+ // respect spark connect config
+ KyuubiSparkUtil.globalSparkContext
+ .getConf
+ .getOption("spark.connect.grpc.arrow.maxBatchSize")
+ .orElse(Option("4m"))
+ .map(JavaUtils.byteStringAs(_, ByteUnit.MiB))
+ .get
+ }
+
+ private def doCollectLimit(collectLimit: CollectLimitExec):
Array[Array[Byte]] = {
+ // TODO: SparkPlan.session introduced in SPARK-35798, replace with
SparkPlan.session once we
+ // drop Spark-3.1.x support.
+ val timeZoneId = SparkSession.active.sessionState.conf.sessionLocalTimeZone
+ val maxRecordsPerBatch =
SparkSession.active.sessionState.conf.arrowMaxRecordsPerBatch
+
+ val batches = KyuubiArrowConverters.takeAsArrowBatches(
+ collectLimit,
+ maxRecordsPerBatch,
+ maxBatchSize,
+ timeZoneId)
+
+ // note that the number of rows in the returned arrow batches may be >=
`limit`, perform
+ // the slicing operation of result
+ val result = ArrayBuffer[Array[Byte]]()
+ var i = 0
+ var rest = collectLimit.limit
+ while (i < batches.length && rest > 0) {
+ val (batch, size) = batches(i)
+ if (size <= rest) {
+ result += batch
+ // returned ArrowRecordBatch has less than `limit` row count, safety
to do conversion
+ rest -= size.toInt
+ } else { // size > rest
+ result += KyuubiArrowConverters.slice(collectLimit.schema, timeZoneId,
batch, 0, rest)
+ rest = 0
+ }
+ i += 1
+ }
+ result.toArray
+ }
+
+ /**
+ * This method provides a reflection-based implementation of
+ * [[AdaptiveSparkPlanExec.finalPhysicalPlan]] that enables us to adapt to
the Spark runtime
+ * without patching SPARK-41914.
+ *
+ * TODO: Once we drop support for Spark 3.1.x, we can directly call
+ * [[AdaptiveSparkPlanExec.finalPhysicalPlan]].
+ */
+ def finalPhysicalPlan(adaptiveSparkPlanExec: AdaptiveSparkPlanExec):
SparkPlan = {
+ withFinalPlanUpdate(adaptiveSparkPlanExec, identity)
+ }
+
+ /**
+ * A reflection-based implementation of
[[AdaptiveSparkPlanExec.withFinalPlanUpdate]].
+ */
+ private def withFinalPlanUpdate[T](
+ adaptiveSparkPlanExec: AdaptiveSparkPlanExec,
+ fun: SparkPlan => T): T = {
+ val getFinalPhysicalPlan = DynMethods.builder("getFinalPhysicalPlan")
+ .hiddenImpl(adaptiveSparkPlanExec.getClass)
+ .build()
+ val plan = getFinalPhysicalPlan.invoke[SparkPlan](adaptiveSparkPlanExec)
+ val result = fun(plan)
+ val finalPlanUpdate = DynMethods.builder("finalPlanUpdate")
+ .hiddenImpl(adaptiveSparkPlanExec.getClass)
+ .build()
+ finalPlanUpdate.invoke[Unit](adaptiveSparkPlanExec)
+ result
+ }
+
+ /**
+ * offset support was add since Spark-3.4(set SPARK-28330), to ensure
backward compatibility with
+ * earlier versions of Spark, this function uses reflective calls to the
"offset".
+ */
+ private def offset(collectLimitExec: CollectLimitExec): Int = {
+ val offset = DynMethods.builder("offset")
+ .impl(collectLimitExec.getClass)
+ .orNoop()
+ .build()
+ Option(offset.invoke[Int](collectLimitExec))
+ .getOrElse(0)
Review Comment:
nit: merge to previous line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]