aakash-db commented on code in PR #51003:
URL: https://github.com/apache/spark/pull/51003#discussion_r2116680394


##########
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala:
##########
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.utils
+
+import java.io.{BufferedReader, File, FileNotFoundException, InputStreamReader}
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+import scala.util.{Failure, Try}
+import scala.util.control.NonFatal
+
+import org.scalactic.source
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Tag}
+import org.scalatest.exceptions.TestFailedDueToTimeoutException
+import org.scalatest.matchers.should.Matchers
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Column, QueryTest, Row, TypedColumn}
+import org.apache.spark.sql.SparkSession.{clearActiveSession, setActiveSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.classic.{DataFrame, Dataset, SparkSession, 
SQLContext}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.pipelines.utils.PipelineTest.{cleanupMetastore, 
createTempDir}
+
+abstract class PipelineTest
+    extends SparkFunSuite
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach
+    with Matchers
+//    with SQLImplicits
+    with SparkErrorTestMixin
+    with TargetCatalogAndSchemaMixin
+    with Logging {
+
+  final protected val storageRoot = createTempDir()
+
+  var spark: SparkSession = createAndInitializeSpark()
+  val originalSpark: SparkSession = spark.cloneSession()
+
+  implicit def sqlContext: SQLContext = spark.sqlContext
+  def sql(text: String): DataFrame = spark.sql(text)
+
+  /**
+   * Spark confs for [[originalSpark]]. Spark confs set here will be the 
default spark confs for
+   * all spark sessions created in tests.
+   */
+  protected def sparkConf: SparkConf = {
+    var conf = new SparkConf()
+      .set("spark.sql.shuffle.partitions", "2")
+      .set("spark.sql.session.timeZone", "UTC")
+
+    if (schemaInPipelineSpec.isDefined) {
+      conf = conf.set("pipelines.schema", schemaInPipelineSpec.get)
+    }
+
+    if (Option(System.getenv("ENABLE_SPARK_UI")).exists(s => 
java.lang.Boolean.valueOf(s))) {
+      conf = conf.set("spark.ui.enabled", "true")
+    }
+    conf
+  }
+
+  /** Returns the dataset name in the event log. */
+  protected def eventLogName(
+      name: String,
+      catalog: Option[String] = catalogInPipelineSpec,
+      schema: Option[String] = schemaInPipelineSpec,
+      isView: Boolean = false
+  ): String = {
+    fullyQualifiedIdentifier(name, catalog, schema, isView).unquotedString
+  }
+
+  /** Returns the fully qualified identifier. */
+  protected def fullyQualifiedIdentifier(
+      name: String,
+      catalog: Option[String] = catalogInPipelineSpec,
+      schema: Option[String] = schemaInPipelineSpec,
+      isView: Boolean = false
+  ): TableIdentifier = {
+    if (isView) {
+      TableIdentifier(name)
+    } else {
+      TableIdentifier(
+        catalog = catalog,
+        database = schema,
+        table = name
+      )
+    }
+  }
+
+//  /** Returns the [[PipelineApiConf]] constructed from the current spark 
session */
+//  def pipelineApiConf: PipelineApiConf = PipelineApiConf.instance
+
+  /**
+   * Runs the given function with the given spark conf, and resets the conf 
after the function
+   * completes.
+   */
+  def withSparkConfs[T](confs: Map[String, String])(f: => T): T = {
+    val originalConfs = confs.keys.map(k => k -> spark.conf.getOption(k)).toMap
+    confs.foreach { case (k, v) => spark.conf.set(k, v) }
+    try f
+    finally originalConfs.foreach {
+      case (k, v) =>
+        v match {
+          case Some(v) => spark.conf.set(k, v)
+          case None => spark.conf.unset(k)
+        }
+    }
+  }
+
+  /**
+   * This exists temporarily for compatibility with tests that become invalid 
when multiple
+   * executors are available.
+   */
+  protected def master = "local[*]"
+
+  /** Creates and returns a initialized spark session. */
+  def createAndInitializeSpark(): SparkSession = {
+    val newSparkSession = SparkSession
+      .builder()
+      .config(sparkConf)
+      .master(master)
+      .getOrCreate()
+    newSparkSession
+  }
+
+  /** Set up the spark session before each test. */
+  protected def initializeSparkBeforeEachTest(): Unit = {
+    clearActiveSession()
+    spark = originalSpark.newSession()
+    setActiveSession(spark)
+  }
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    initializeSparkBeforeEachTest()
+    cleanupMetastore(spark)
+    (catalogInPipelineSpec, schemaInPipelineSpec) match {
+      case (Some(catalog), Some(schema)) =>
+        sql(s"CREATE SCHEMA IF NOT EXISTS `$catalog`.`$schema`")
+      case _ =>
+        schemaInPipelineSpec.foreach(s => sql(s"CREATE SCHEMA IF NOT EXISTS 
`$s`"))
+    }
+  }
+
+  override def afterEach(): Unit = {
+    cleanupMetastore(spark)
+    super.afterEach()
+  }
+
+  override def afterAll(): Unit = {
+    spark.stop()
+  }
+
+  protected def gridTest[A](testNamePrefix: String, testTags: Tag*)(params: 
Seq[A])(
+      testFun: A => Unit): Unit = {
+    namedGridTest(testNamePrefix, testTags: _*)(params.map(a => a.toString -> 
a).toMap)(testFun)
+  }
+
+  override def test(testName: String, testTags: Tag*)(testFun: => Any /* 
Assertion */ )(
+      implicit pos: source.Position): Unit = super.test(testName, testTags: 
_*) {
+    runWithInstrumentation(testFun)
+  }
+
+  /**
+   * Adds custom instrumentation for tests.
+   *
+   * This instrumentation runs after `beforeEach` and
+   * before `afterEach` which lets us instrument the state of a test and its 
environment
+   * after any setup and before any clean-up done for a test.
+   */
+  private def runWithInstrumentation(testFunc: => Any): Any = {
+    try {
+      testFunc
+    } catch {
+      case e: TestFailedDueToTimeoutException =>
+//        val stackTraces = StackTraceReporter.dumpAllStackTracesToString()
+//        logInfo(
+//          s"""
+//             |Triggering thread dump since test failed with a timeout 
exception:
+//             |$stackTraces
+//             |""".stripMargin
+//        )
+        throw e
+    }
+  }
+
+  /**
+   * Creates individual tests for all items in [[params]].
+   *
+   * The full test name will be "<testNamePrefix> (<paramName> = <param>)" 
where <param> is one
+   * item in [[params]].
+   *
+   * @param testNamePrefix The test name prefix.
+   * @param paramName A descriptive name for the parameter.
+   * @param testTags Extra tags for the test.
+   * @param params The list of parameters for which to generate tests.
+   * @param testFun The actual test function. This function will be called 
with one argument of
+   *                type [[A]].
+   * @tparam A The type of the params.
+   */
+  protected def gridTest[A](testNamePrefix: String, paramName: String, 
testTags: Tag*)(
+      params: Seq[A])(testFun: A => Unit): Unit =
+    namedGridTest(testNamePrefix, testTags: _*)(
+      params.map(a => s"$paramName = $a" -> a).toMap
+    )(testFun)
+
+  /**
+   * Specialized version of gridTest where the params are two boolean values - 
[[true]] and
+   * [[false]].
+   */
+  protected def booleanGridTest(testNamePrefix: String, paramName: String, 
testTags: Tag*)(
+      testFun: Boolean => Unit): Unit = {
+    gridTest(testNamePrefix, paramName, testTags: _*)(Seq(true, 
false))(testFun)
+  }
+
+  protected def namedGridTest[A](testNamePrefix: String, testTags: 
Tag*)(params: Map[String, A])(
+      testFun: A => Unit): Unit = {
+    for (param <- params) {
+      test(testNamePrefix + s" (${param._1})", testTags: _*)(testFun(param._2))
+    }
+  }
+
+  protected def namedGridIgnore[A](testNamePrefix: String, testTags: 
Tag*)(params: Map[String, A])(
+      testFun: A => Unit): Unit = {
+    for (param <- params) {
+      ignore(testNamePrefix + s" (${param._1})", testTags: 
_*)(testFun(param._2))
+    }
+  }
+
+  /**
+   * Returns a [[Seq]] of JARs generated by compiling this test.
+   *
+   * Includes a "delta-pipelines-repo" to ensure the export_test, which 
compiles differently,
+   * still succeeds.
+   */
+  protected def getTestJars: Seq[String] =
+    getUniqueAbsoluteTestJarPaths.map(_.getName) :+ "delta-pipelines-repo"

Review Comment:
   good catch, nope. removed.



##########
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala:
##########
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.utils
+
+import java.io.{BufferedReader, File, FileNotFoundException, InputStreamReader}
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+import scala.util.{Failure, Try}
+import scala.util.control.NonFatal
+
+import org.scalactic.source
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Tag}
+import org.scalatest.exceptions.TestFailedDueToTimeoutException
+import org.scalatest.matchers.should.Matchers
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Column, QueryTest, Row, TypedColumn}
+import org.apache.spark.sql.SparkSession.{clearActiveSession, setActiveSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.classic.{DataFrame, Dataset, SparkSession, 
SQLContext}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.pipelines.utils.PipelineTest.{cleanupMetastore, 
createTempDir}
+
+abstract class PipelineTest
+    extends SparkFunSuite
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach
+    with Matchers
+//    with SQLImplicits
+    with SparkErrorTestMixin
+    with TargetCatalogAndSchemaMixin
+    with Logging {
+
+  final protected val storageRoot = createTempDir()
+
+  var spark: SparkSession = createAndInitializeSpark()
+  val originalSpark: SparkSession = spark.cloneSession()
+
+  implicit def sqlContext: SQLContext = spark.sqlContext
+  def sql(text: String): DataFrame = spark.sql(text)
+
+  /**
+   * Spark confs for [[originalSpark]]. Spark confs set here will be the 
default spark confs for
+   * all spark sessions created in tests.
+   */
+  protected def sparkConf: SparkConf = {
+    var conf = new SparkConf()
+      .set("spark.sql.shuffle.partitions", "2")
+      .set("spark.sql.session.timeZone", "UTC")
+
+    if (schemaInPipelineSpec.isDefined) {
+      conf = conf.set("pipelines.schema", schemaInPipelineSpec.get)
+    }
+
+    if (Option(System.getenv("ENABLE_SPARK_UI")).exists(s => 
java.lang.Boolean.valueOf(s))) {
+      conf = conf.set("spark.ui.enabled", "true")
+    }
+    conf
+  }
+
+  /** Returns the dataset name in the event log. */
+  protected def eventLogName(
+      name: String,
+      catalog: Option[String] = catalogInPipelineSpec,
+      schema: Option[String] = schemaInPipelineSpec,
+      isView: Boolean = false
+  ): String = {
+    fullyQualifiedIdentifier(name, catalog, schema, isView).unquotedString
+  }
+
+  /** Returns the fully qualified identifier. */
+  protected def fullyQualifiedIdentifier(
+      name: String,
+      catalog: Option[String] = catalogInPipelineSpec,
+      schema: Option[String] = schemaInPipelineSpec,
+      isView: Boolean = false
+  ): TableIdentifier = {
+    if (isView) {
+      TableIdentifier(name)
+    } else {
+      TableIdentifier(
+        catalog = catalog,
+        database = schema,
+        table = name
+      )
+    }
+  }
+
+//  /** Returns the [[PipelineApiConf]] constructed from the current spark 
session */
+//  def pipelineApiConf: PipelineApiConf = PipelineApiConf.instance
+
+  /**
+   * Runs the given function with the given spark conf, and resets the conf 
after the function
+   * completes.
+   */
+  def withSparkConfs[T](confs: Map[String, String])(f: => T): T = {
+    val originalConfs = confs.keys.map(k => k -> spark.conf.getOption(k)).toMap
+    confs.foreach { case (k, v) => spark.conf.set(k, v) }
+    try f
+    finally originalConfs.foreach {
+      case (k, v) =>
+        v match {
+          case Some(v) => spark.conf.set(k, v)
+          case None => spark.conf.unset(k)
+        }
+    }
+  }
+
+  /**
+   * This exists temporarily for compatibility with tests that become invalid 
when multiple
+   * executors are available.
+   */
+  protected def master = "local[*]"
+
+  /** Creates and returns a initialized spark session. */
+  def createAndInitializeSpark(): SparkSession = {
+    val newSparkSession = SparkSession
+      .builder()
+      .config(sparkConf)
+      .master(master)
+      .getOrCreate()
+    newSparkSession
+  }
+
+  /** Set up the spark session before each test. */
+  protected def initializeSparkBeforeEachTest(): Unit = {
+    clearActiveSession()
+    spark = originalSpark.newSession()
+    setActiveSession(spark)
+  }
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    initializeSparkBeforeEachTest()
+    cleanupMetastore(spark)
+    (catalogInPipelineSpec, schemaInPipelineSpec) match {
+      case (Some(catalog), Some(schema)) =>
+        sql(s"CREATE SCHEMA IF NOT EXISTS `$catalog`.`$schema`")
+      case _ =>
+        schemaInPipelineSpec.foreach(s => sql(s"CREATE SCHEMA IF NOT EXISTS 
`$s`"))
+    }
+  }
+
+  override def afterEach(): Unit = {
+    cleanupMetastore(spark)
+    super.afterEach()
+  }
+
+  override def afterAll(): Unit = {
+    spark.stop()
+  }
+
+  protected def gridTest[A](testNamePrefix: String, testTags: Tag*)(params: 
Seq[A])(
+      testFun: A => Unit): Unit = {
+    namedGridTest(testNamePrefix, testTags: _*)(params.map(a => a.toString -> 
a).toMap)(testFun)
+  }
+
+  override def test(testName: String, testTags: Tag*)(testFun: => Any /* 
Assertion */ )(
+      implicit pos: source.Position): Unit = super.test(testName, testTags: 
_*) {
+    runWithInstrumentation(testFun)
+  }
+
+  /**
+   * Adds custom instrumentation for tests.
+   *
+   * This instrumentation runs after `beforeEach` and
+   * before `afterEach` which lets us instrument the state of a test and its 
environment
+   * after any setup and before any clean-up done for a test.
+   */
+  private def runWithInstrumentation(testFunc: => Any): Any = {
+    try {
+      testFunc
+    } catch {
+      case e: TestFailedDueToTimeoutException =>
+//        val stackTraces = StackTraceReporter.dumpAllStackTracesToString()
+//        logInfo(
+//          s"""
+//             |Triggering thread dump since test failed with a timeout 
exception:
+//             |$stackTraces
+//             |""".stripMargin
+//        )
+        throw e
+    }
+  }
+
+  /**
+   * Creates individual tests for all items in [[params]].
+   *
+   * The full test name will be "<testNamePrefix> (<paramName> = <param>)" 
where <param> is one
+   * item in [[params]].
+   *
+   * @param testNamePrefix The test name prefix.
+   * @param paramName A descriptive name for the parameter.
+   * @param testTags Extra tags for the test.
+   * @param params The list of parameters for which to generate tests.
+   * @param testFun The actual test function. This function will be called 
with one argument of
+   *                type [[A]].
+   * @tparam A The type of the params.
+   */
+  protected def gridTest[A](testNamePrefix: String, paramName: String, 
testTags: Tag*)(
+      params: Seq[A])(testFun: A => Unit): Unit =
+    namedGridTest(testNamePrefix, testTags: _*)(
+      params.map(a => s"$paramName = $a" -> a).toMap
+    )(testFun)
+
+  /**
+   * Specialized version of gridTest where the params are two boolean values - 
[[true]] and
+   * [[false]].
+   */
+  protected def booleanGridTest(testNamePrefix: String, paramName: String, 
testTags: Tag*)(
+      testFun: Boolean => Unit): Unit = {
+    gridTest(testNamePrefix, paramName, testTags: _*)(Seq(true, 
false))(testFun)
+  }
+
+  protected def namedGridTest[A](testNamePrefix: String, testTags: 
Tag*)(params: Map[String, A])(
+      testFun: A => Unit): Unit = {
+    for (param <- params) {
+      test(testNamePrefix + s" (${param._1})", testTags: _*)(testFun(param._2))
+    }
+  }
+
+  protected def namedGridIgnore[A](testNamePrefix: String, testTags: 
Tag*)(params: Map[String, A])(
+      testFun: A => Unit): Unit = {
+    for (param <- params) {
+      ignore(testNamePrefix + s" (${param._1})", testTags: 
_*)(testFun(param._2))
+    }
+  }
+
+  /**
+   * Returns a [[Seq]] of JARs generated by compiling this test.
+   *
+   * Includes a "delta-pipelines-repo" to ensure the export_test, which 
compiles differently,
+   * still succeeds.
+   */
+  protected def getTestJars: Seq[String] =
+    getUniqueAbsoluteTestJarPaths.map(_.getName) :+ "delta-pipelines-repo"
+
+  /**
+   * Returns a [[Seq]] of absolute paths of all JAR files found in the
+   * current directory. See [[getUniqueAbsoluteTestJarPaths]].
+   */
+  protected def getTestJarPaths: Seq[String] =
+    getUniqueAbsoluteTestJarPaths.map(_.getAbsolutePath)
+
+  /**
+   * Returns a sequence of JARs found in the current directory. In a bazel 
test,

Review Comment:
   removed this func, see above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to