pan3793 commented on code in PR #4662:
URL: https://github.com/apache/kyuubi/pull/4662#discussion_r1160387144
##########
externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala:
##########
@@ -177,4 +341,101 @@ class SparkArrowbasedOperationSuite extends
WithSparkSQLEngine with SparkDataTyp
.allSessions()
.foreach(_.asInstanceOf[SparkSessionImpl].spark.listenerManager.unregister(listener))
}
+
+ private def withSparkListener[T](listener: SparkListener)(body: => T): T = {
+ withAllSessions(s => s.sparkContext.addSparkListener(listener))
+ try {
+ body
+ } finally {
+ withAllSessions(s => s.sparkContext.removeSparkListener(listener))
+ }
+
+ }
+
+ private def withPartitionedTable[T](viewName: String)(body: => T): T = {
+ withAllSessions { spark =>
+ spark.range(0, 1000, 1, numPartitions = 100)
+ .createOrReplaceTempView(viewName)
+ }
+ try {
+ body
+ } finally {
+ withAllSessions { spark =>
+ spark.sql(s"DROP VIEW IF EXISTS $viewName")
+ }
+ }
+ }
+
+ private def withAllSessions(op: SparkSession => Unit): Unit = {
+ SparkSQLEngine.currentEngine.get
+ .backendService
+ .sessionManager
+ .allSessions()
+ .map(_.asInstanceOf[SparkSessionImpl].spark)
+ .foreach(op(_))
+ }
+
+ private def runAdaptiveAndVerifyResult(query: String): (SparkPlan,
SparkPlan) = {
+ val dfAdaptive = spark.sql(query)
+ val planBefore = dfAdaptive.queryExecution.executedPlan
+ val result = dfAdaptive.collect()
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ val df = spark.sql(query)
+ QueryTest.checkAnswer(df, df.collect().toSeq)
+ }
+ val planAfter = dfAdaptive.queryExecution.executedPlan
+ val adaptivePlan =
planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+ val exchanges = adaptivePlan.collect {
+ case e: Exchange => e
+ }
+ assert(exchanges.isEmpty, "The final plan should not contain any Exchange
node.")
+ (dfAdaptive.queryExecution.sparkPlan, adaptivePlan)
+ }
+
+ /**
+ * Sets all SQL configurations specified in `pairs`, calls `f`, and then
restores all SQL
+ * configurations.
+ */
+ protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+ val conf = SQLConf.get
+ val (keys, values) = pairs.unzip
+ val currentValues = keys.map { key =>
+ if (conf.contains(key)) {
+ Some(conf.getConfString(key))
+ } else {
+ None
+ }
+ }
+ (keys, values).zipped.foreach { (k, v) =>
+ if (isStaticConfigKey(k)) {
+ throw new KyuubiException(s"Cannot modify the value of a static
config: $k")
+ }
+ conf.setConfString(k, v)
+ }
+ try f
+ finally {
+ keys.zip(currentValues).foreach {
+ case (key, Some(value)) => conf.setConfString(key, value)
+ case (key, None) => conf.unsetConf(key)
+ }
+ }
+ }
+
+ /**
+ * This method provides a reflection-based implementation of
[[SQLConf.isStaticConfigKey]] to
+ * adapt Spark-3.1.x
+ *
+ * TODO: Once we drop support for Spark 3.1.x, we can directly call
+ * [[SQLConf.isStaticConfigKey()]].
+ */
+ private def isStaticConfigKey(key: String): Boolean = {
+ val staticConfKeys = DynFields.builder()
+ .hiddenImpl(SQLConf.getClass, "staticConfKeys")
+ .build[java.util.Set[String]](SQLConf)
Review Comment:
nit: the recommended ref is `JSet` or `util.Set`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]