dtenedor commented on code in PR #44084: URL: https://github.com/apache/spark/pull/44084#discussion_r1410315894
########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/JdbcConnection.scala: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.sql.{DriverManager, ResultSet} +import java.util.Properties + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry + +private[sql] trait JdbcConnection { + /** + * Runs the given query. + * @return A Seq[String] representing the output. + */ + def runQuery(query: String): Seq[String] + + /** + * Drop the table with the given table name. + */ + def dropTable(tableName: String): Unit + + /** + * Create a table with the given table name and schema. + */ + def createTable(tableName: String, schemaString: String): Unit + + /** + * Load data from the given Spark Dataframe into the table with given name. + */ + def loadData(df: DataFrame, tableName: String): Unit + + /** + * Close the connection. + */ + def close(): Unit +} + +private[sql] case class PostgresConnection(connection_url: Option[String] = None) Review Comment: please add a comment? ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/SQLQueryTestRunner.scala: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +/** + * Trait for classes that can run SQL queries for testing. + */ +trait SQLQueryTestRunner { + + /** + * Runs a given query and returns a Seq[String] that represents the query result output. Review Comment: please explain why it's a sequence? what are the components? ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/CrossDbmsQueryTestSuite.scala: ########## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.io.File +import java.util.Locale + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DescribeColumn, DescribeRelation, Distinct, Generate, Join, LogicalPlan, Sample, Sort} +import org.apache.spark.sql.catalyst.util.stringToFile +import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeCommandBase} +import org.apache.spark.util.Utils + +// scalastyle:off line.size.limit +/** + * See SQLQueryTestSuite.scala for more information. This class builds off of that to allow us + * to generate golden files with other DBMS to perform cross-checking for correctness. Note that the + * input directory path is currently limited because most, if not all, of our current SQL query + * tests will not be compatible with other DBMSes. There will be more work in the future, such as + * some kind of conversion, to increase coverage. + * + * If your SQL query test is not compatible with other DBMSes, please add it to the `ignoreList` at + * the bottom of this file. + * + * You need to have a database server up before running this test. + * For example, for postgres: + * 1. Install PostgreSQL. + * a. On a mac: `brew install postgresql@13` + * 2. After installing PostgreSQL, start the database server, then create a role named pg with + * superuser permissions: `createuser -s pg`` OR `psql> CREATE role pg superuser`` + * + * To run the entire test suite: + * {{{ + * build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden files for entire suite, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden file for a single test, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite -- -z describe.sql" + * }}} + * + * To specify a DBMS to use (the default is postgres): + * {{{ + * REF_DBMS=mysql SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + */ +// scalastyle:on line.size.limit +class CrossDbmsQueryTestSuite extends SQLQueryTestSuite with Logging { + + // Note: the below two functions have to be functions instead of variables because the superclass + // runs the test first before the subclass variables can be instantiated. + private def crossDbmsToGenerateGoldenFiles: String = { + val userInputDbms = System.getenv("REF_DBMS") + if (userInputDbms != null && userInputDbms.nonEmpty) { + assert(CrossDbmsQueryTestSuite.SUPPORTED_DBMS.contains(userInputDbms), + s"$userInputDbms is not currently supported.") + userInputDbms + } else { + CrossDbmsQueryTestSuite.DEFAULT_DBMS + } + } + private def customConnectionUrl: String = System.getenv("REF_DBMS_CONNECTION_URL") + + override protected def runQueries( + queries: Seq[String], + testCase: TestCase, + configSet: Seq[(String, String)]): Unit = { + val localSparkSession = spark.newSession() + + var runner: Option[SQLQueryTestRunner] = None + val outputs: Seq[QueryTestOutput] = queries.map { sql => + val output = { + // Use the runner when generating golden files, and Spark when running the test against + // the already generated golden files. + if (regenerateGoldenFiles) { + if (runner.isEmpty) { + val connectionUrl = if (customConnectionUrl != null && customConnectionUrl.nonEmpty) { + Some(customConnectionUrl) + } else { + None + } + runner = Some(CrossDbmsQueryTestSuite.DBMS_TO_CONNECTION_MAPPING( + crossDbmsToGenerateGoldenFiles)(connectionUrl)) + } + val sparkDf = spark.sql(sql) + val output = runner.map(_.runQuery(sql)).get + // Use Spark analyzed plan to check if the query result is already semantically sorted + val result = if (isSemanticallySorted(sparkDf.queryExecution.analyzed)) { + output + } else { + // Sort the answer manually if it isn't sorted. + output.sorted + } + result + } else { + handleExceptions(getNormalizedQueryExecutionResult(localSparkSession, sql))._2 + } + } + // We do some query canonicalization now. + val executionOutput = ExecutionOutput( + sql = sql, + // Don't care about the schema for this test. Only care about correctness. + schema = None, + output = normalizeTestResults(output.mkString("\n"))) + if (testCase.isInstanceOf[CTETest]) { + expandCTEQueryAndCompareResult(localSparkSession, sql, executionOutput) + } + executionOutput + } + runner.foreach(_.cleanUp()) + + if (regenerateGoldenFiles) { + val goldenOutput = { + s"-- Automatically generated by ${getClass.getSimpleName}\n" + + outputs.mkString("\n\n\n") + "\n" + } + val resultFile = new File(testCase.resultFile) + val parent = resultFile.getParentFile + if (!parent.exists()) { + assert(parent.mkdirs(), "Could not create directory: " + parent) + } + stringToFile(resultFile, goldenOutput) + } + + readGoldenFileAndCompareResults(testCase.resultFile, outputs, ExecutionOutput) + } + + override def createScalaTestCase(testCase: TestCase): Unit = { + if (ignoreList.exists(t => + testCase.name.toLowerCase(Locale.ROOT).contains(t.toLowerCase(Locale.ROOT)))) { + ignore(testCase.name) { + /* Do nothing */ Review Comment: maybe explicitly log that we are intentionally ignoring this filename? ########## sql/core/src/test/resources/sql-tests/inputs/postgres-crosstest/sqllogictest-select1.sql: ########## @@ -0,0 +1,90 @@ +-- First 6 queries of the sql-logic-tests with some minor changes for compatibility, available here: +-- https://www.sqlite.org/sqllogictest/file?name=test/select1.test&ci=tip. + +CREATE VIEW t1(a, b, c, d, e) AS VALUES + (103,102,100,101,104), + (107,106,108,109,105), + (110,114,112,111,113), + (116,119,117,115,118), + (123,122,124,120,121), + (127,128,129,126,125), + (132,134,131,133,130), + (138,136,139,135,137), + (144,141,140,142,143), + (145,149,146,148,147), + (151,150,153,154,152), + (155,157,159,156,158), + (161,160,163,164,162), + (167,169,168,165,166), + (171,170,172,173,174), + (177,176,179,178,175), + (181,180,182,183,184), + (187,188,186,189,185), + (190,194,193,192,191), + (199,197,198,196,195), + (200,202,203,201,204), + (208,209,205,206,207), + (214,210,213,212,211), + (218,215,216,217,219), + (223,221,222,220,224), + (226,227,228,229,225), + (234,231,232,230,233), + (237,236,239,235,238), + (242,244,240,243,241), + (246,248,247,249,245); + +SELECT CASE WHEN c>(SELECT avg(c) FROM t1) THEN a*2 ELSE b*10 END + FROM t1 + ORDER BY 1; + +SELECT a+b*2+c*3+d*4+e*5, Review Comment: please fix formatting to e.g. add spaces before and after binary operators. ########## sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala: ########## @@ -612,9 +612,13 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper } } + protected def resultFileForInputFile(file: File): String = { Review Comment: please add a comment? ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/CrossDbmsQueryTestSuite.scala: ########## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.io.File +import java.util.Locale + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DescribeColumn, DescribeRelation, Distinct, Generate, Join, LogicalPlan, Sample, Sort} +import org.apache.spark.sql.catalyst.util.stringToFile +import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeCommandBase} +import org.apache.spark.util.Utils + +// scalastyle:off line.size.limit +/** + * See SQLQueryTestSuite.scala for more information. This class builds off of that to allow us + * to generate golden files with other DBMS to perform cross-checking for correctness. Note that the + * input directory path is currently limited because most, if not all, of our current SQL query + * tests will not be compatible with other DBMSes. There will be more work in the future, such as + * some kind of conversion, to increase coverage. + * + * If your SQL query test is not compatible with other DBMSes, please add it to the `ignoreList` at + * the bottom of this file. + * + * You need to have a database server up before running this test. + * For example, for postgres: + * 1. Install PostgreSQL. + * a. On a mac: `brew install postgresql@13` + * 2. After installing PostgreSQL, start the database server, then create a role named pg with + * superuser permissions: `createuser -s pg`` OR `psql> CREATE role pg superuser`` + * + * To run the entire test suite: + * {{{ + * build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden files for entire suite, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden file for a single test, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite -- -z describe.sql" + * }}} + * + * To specify a DBMS to use (the default is postgres): + * {{{ + * REF_DBMS=mysql SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + */ +// scalastyle:on line.size.limit +class CrossDbmsQueryTestSuite extends SQLQueryTestSuite with Logging { + + // Note: the below two functions have to be functions instead of variables because the superclass + // runs the test first before the subclass variables can be instantiated. + private def crossDbmsToGenerateGoldenFiles: String = { + val userInputDbms = System.getenv("REF_DBMS") + if (userInputDbms != null && userInputDbms.nonEmpty) { + assert(CrossDbmsQueryTestSuite.SUPPORTED_DBMS.contains(userInputDbms), + s"$userInputDbms is not currently supported.") + userInputDbms + } else { + CrossDbmsQueryTestSuite.DEFAULT_DBMS + } + } + private def customConnectionUrl: String = System.getenv("REF_DBMS_CONNECTION_URL") + + override protected def runQueries( + queries: Seq[String], + testCase: TestCase, + configSet: Seq[(String, String)]): Unit = { + val localSparkSession = spark.newSession() + + var runner: Option[SQLQueryTestRunner] = None + val outputs: Seq[QueryTestOutput] = queries.map { sql => + val output = { + // Use the runner when generating golden files, and Spark when running the test against + // the already generated golden files. + if (regenerateGoldenFiles) { + if (runner.isEmpty) { + val connectionUrl = if (customConnectionUrl != null && customConnectionUrl.nonEmpty) { + Some(customConnectionUrl) + } else { + None + } + runner = Some(CrossDbmsQueryTestSuite.DBMS_TO_CONNECTION_MAPPING( + crossDbmsToGenerateGoldenFiles)(connectionUrl)) + } + val sparkDf = spark.sql(sql) + val output = runner.map(_.runQuery(sql)).get + // Use Spark analyzed plan to check if the query result is already semantically sorted + val result = if (isSemanticallySorted(sparkDf.queryExecution.analyzed)) { + output + } else { + // Sort the answer manually if it isn't sorted. + output.sorted + } + result + } else { + handleExceptions(getNormalizedQueryExecutionResult(localSparkSession, sql))._2 + } + } + // We do some query canonicalization now. + val executionOutput = ExecutionOutput( + sql = sql, + // Don't care about the schema for this test. Only care about correctness. + schema = None, + output = normalizeTestResults(output.mkString("\n"))) + if (testCase.isInstanceOf[CTETest]) { + expandCTEQueryAndCompareResult(localSparkSession, sql, executionOutput) + } + executionOutput + } + runner.foreach(_.cleanUp()) + + if (regenerateGoldenFiles) { + val goldenOutput = { + s"-- Automatically generated by ${getClass.getSimpleName}\n" + + outputs.mkString("\n\n\n") + "\n" + } + val resultFile = new File(testCase.resultFile) + val parent = resultFile.getParentFile + if (!parent.exists()) { + assert(parent.mkdirs(), "Could not create directory: " + parent) + } + stringToFile(resultFile, goldenOutput) + } + + readGoldenFileAndCompareResults(testCase.resultFile, outputs, ExecutionOutput) + } + + override def createScalaTestCase(testCase: TestCase): Unit = { + if (ignoreList.exists(t => + testCase.name.toLowerCase(Locale.ROOT).contains(t.toLowerCase(Locale.ROOT)))) { + ignore(testCase.name) { + /* Do nothing */ + } + } else { + testCase match { + case _: RegularTestCase => + // Create a test case to run this case. + test(testCase.name) { + runSqlTestCase(testCase, listTestCases) + } + case _ => + ignore(s"Ignoring test cases that are not [[RegularTestCase]] for now") { + /* Do nothing */ + } + } + } + } + + override protected def resultFileForInputFile(file: File): String = { + val defaultResultsDir = new File(baseResourcePath, "results") + val goldenFilePath = new File( + defaultResultsDir, s"$crossDbmsToGenerateGoldenFiles-results").getAbsolutePath + file.getAbsolutePath.replace(inputFilePath, goldenFilePath) + ".out" + } + + override lazy val listTestCases: Seq[TestCase] = { + listFilesRecursively(new File(inputFilePath)).flatMap { file => + var resultFile = resultFileForInputFile(file) + // JDK-4511638 changes 'toString' result of Float/Double + // JDK-8282081 changes DataTimeFormatter 'F' symbol + if (Utils.isJavaVersionAtLeast21) { + if (new File(resultFile + ".java21").exists()) resultFile += ".java21" + } + val absPath = file.getAbsolutePath + val testCaseName = absPath.stripPrefix(inputFilePath).stripPrefix(File.separator) + RegularTestCase(testCaseName, absPath, resultFile) :: Nil + }.sortBy(_.name) + } + + private def isSemanticallySorted(plan: LogicalPlan): Boolean = plan match { + case _: Join | _: Aggregate | _: Generate | _: Sample | _: Distinct => false + case _: DescribeCommandBase + | _: DescribeColumnCommand + | _: DescribeRelation + | _: DescribeColumn => true + case PhysicalOperation(_, _, Sort(_, true, _)) => true + case _ => plan.children.iterator.exists(isSemanticallySorted) + } + + // Ignore all tests for now due to likely incompatibility. Review Comment: How many tests do you think we will actually run with this mode later? Do you think an allowlist might make more sense than a denylist? For example, would just the golden files in the existing `postgres` directory maybe make sense for this? ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/JdbcConnection.scala: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.sql.{DriverManager, ResultSet} +import java.util.Properties + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry + +private[sql] trait JdbcConnection { + /** + * Runs the given query. + * @return A Seq[String] representing the output. + */ + def runQuery(query: String): Seq[String] + + /** + * Drop the table with the given table name. + */ + def dropTable(tableName: String): Unit + + /** + * Create a table with the given table name and schema. + */ + def createTable(tableName: String, schemaString: String): Unit + + /** + * Load data from the given Spark Dataframe into the table with given name. + */ + def loadData(df: DataFrame, tableName: String): Unit + + /** + * Close the connection. + */ + def close(): Unit +} + +private[sql] case class PostgresConnection(connection_url: Option[String] = None) + extends JdbcConnection { + + DriverRegistry.register("org.postgresql.Driver") + private final val DEFAULT_USER = "pg" + private final val DEFAULT_CONNECTION_URL = + s"jdbc:postgresql://localhost:5432/postgres?user=$DEFAULT_USER" + private val url = connection_url.getOrElse(DEFAULT_CONNECTION_URL) + private val conn = DriverManager.getConnection(url) + private val stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) + + def runQuery(query: String): Seq[String] = { + try { + val isResultSet = stmt.execute(query) + val rows = ArrayBuffer[Row]() + if (isResultSet) { + val rs = stmt.getResultSet + val metadata = rs.getMetaData + while (rs.next()) { + val row = Row.fromSeq((1 to metadata.getColumnCount).map(i => rs.getObject(i))) + rows.append(row) + } + } + rows.map(_.mkString("\t")).toSeq + } catch { + case e: Throwable => Seq(e.toString) + } + } + + def dropTable(tableName: String): Unit = { + val dropTableSql = s"DROP TABLE IF EXISTS $tableName" Review Comment: could this be vulnerable to SQL injection? Please add a comment describing why not? ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/JdbcConnection.scala: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.sql.{DriverManager, ResultSet} +import java.util.Properties + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry + +private[sql] trait JdbcConnection { Review Comment: please add a comment saying what this represents? ########## sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala: ########## @@ -928,14 +932,18 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper output: String) extends QueryTestOutput { override def toString: String = { // We are explicitly not using multi-line string due to stripMargin removing "|" in output. + val schemaString = if (schema.nonEmpty) { + s"-- !query schema\n" + schema.get + "\n" + } else { + "" + } s"-- !query\n" + sql + "\n" + - s"-- !query schema\n" + - schema.get + "\n" + + schemaString + s"-- !query output\n" + output } - override def numSegments: Int = 3 + override def numSegments: Int = if (schema.isDefined) { 3 } else { 2 } Review Comment: please add a comment here describing the logic? ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/JdbcConnection.scala: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.sql.{DriverManager, ResultSet} +import java.util.Properties + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry + +private[sql] trait JdbcConnection { + /** + * Runs the given query. + * @return A Seq[String] representing the output. + */ + def runQuery(query: String): Seq[String] + + /** + * Drop the table with the given table name. Review Comment: the method name implies this :) please give more information? Same for the other methods. ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/SQLQueryTestRunner.scala: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +/** + * Trait for classes that can run SQL queries for testing. Review Comment: the method name implies this :) please extend to describe what this trait specifically does? ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/JdbcConnection.scala: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.sql.{DriverManager, ResultSet} +import java.util.Properties + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry + +private[sql] trait JdbcConnection { + /** + * Runs the given query. + * @return A Seq[String] representing the output. Review Comment: can you comment why this is a sequence of strings, rather than a single string? What are the components? ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/CrossDbmsQueryTestSuite.scala: ########## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.io.File +import java.util.Locale + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DescribeColumn, DescribeRelation, Distinct, Generate, Join, LogicalPlan, Sample, Sort} +import org.apache.spark.sql.catalyst.util.stringToFile +import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeCommandBase} +import org.apache.spark.util.Utils + +// scalastyle:off line.size.limit +/** + * See SQLQueryTestSuite.scala for more information. This class builds off of that to allow us + * to generate golden files with other DBMS to perform cross-checking for correctness. Note that the + * input directory path is currently limited because most, if not all, of our current SQL query + * tests will not be compatible with other DBMSes. There will be more work in the future, such as + * some kind of conversion, to increase coverage. + * + * If your SQL query test is not compatible with other DBMSes, please add it to the `ignoreList` at + * the bottom of this file. + * + * You need to have a database server up before running this test. + * For example, for postgres: + * 1. Install PostgreSQL. + * a. On a mac: `brew install postgresql@13` + * 2. After installing PostgreSQL, start the database server, then create a role named pg with + * superuser permissions: `createuser -s pg`` OR `psql> CREATE role pg superuser`` + * + * To run the entire test suite: + * {{{ + * build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden files for entire suite, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden file for a single test, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite -- -z describe.sql" + * }}} + * + * To specify a DBMS to use (the default is postgres): + * {{{ + * REF_DBMS=mysql SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + */ +// scalastyle:on line.size.limit +class CrossDbmsQueryTestSuite extends SQLQueryTestSuite with Logging { + + // Note: the below two functions have to be functions instead of variables because the superclass Review Comment: optional: you could make them `lazy val`s again to enforce that the result is only computed once. Up to you. ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/JdbcConnection.scala: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.sql.{DriverManager, ResultSet} +import java.util.Properties + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry + +private[sql] trait JdbcConnection { + /** + * Runs the given query. + * @return A Seq[String] representing the output. + */ + def runQuery(query: String): Seq[String] + + /** + * Drop the table with the given table name. + */ + def dropTable(tableName: String): Unit + + /** + * Create a table with the given table name and schema. + */ + def createTable(tableName: String, schemaString: String): Unit + + /** + * Load data from the given Spark Dataframe into the table with given name. + */ + def loadData(df: DataFrame, tableName: String): Unit + + /** + * Close the connection. + */ + def close(): Unit +} + +private[sql] case class PostgresConnection(connection_url: Option[String] = None) + extends JdbcConnection { + + DriverRegistry.register("org.postgresql.Driver") + private final val DEFAULT_USER = "pg" + private final val DEFAULT_CONNECTION_URL = + s"jdbc:postgresql://localhost:5432/postgres?user=$DEFAULT_USER" + private val url = connection_url.getOrElse(DEFAULT_CONNECTION_URL) + private val conn = DriverManager.getConnection(url) + private val stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) + + def runQuery(query: String): Seq[String] = { + try { + val isResultSet = stmt.execute(query) + val rows = ArrayBuffer[Row]() + if (isResultSet) { + val rs = stmt.getResultSet + val metadata = rs.getMetaData + while (rs.next()) { + val row = Row.fromSeq((1 to metadata.getColumnCount).map(i => rs.getObject(i))) + rows.append(row) + } + } + rows.map(_.mkString("\t")).toSeq Review Comment: why tabs? Maybe use spaces instead? ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/CrossDbmsQueryTestSuite.scala: ########## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.io.File +import java.util.Locale + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DescribeColumn, DescribeRelation, Distinct, Generate, Join, LogicalPlan, Sample, Sort} +import org.apache.spark.sql.catalyst.util.stringToFile +import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeCommandBase} +import org.apache.spark.util.Utils + +// scalastyle:off line.size.limit +/** + * See SQLQueryTestSuite.scala for more information. This class builds off of that to allow us + * to generate golden files with other DBMS to perform cross-checking for correctness. Note that the + * input directory path is currently limited because most, if not all, of our current SQL query + * tests will not be compatible with other DBMSes. There will be more work in the future, such as + * some kind of conversion, to increase coverage. + * + * If your SQL query test is not compatible with other DBMSes, please add it to the `ignoreList` at + * the bottom of this file. + * + * You need to have a database server up before running this test. + * For example, for postgres: + * 1. Install PostgreSQL. + * a. On a mac: `brew install postgresql@13` + * 2. After installing PostgreSQL, start the database server, then create a role named pg with + * superuser permissions: `createuser -s pg`` OR `psql> CREATE role pg superuser`` + * + * To run the entire test suite: + * {{{ + * build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden files for entire suite, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden file for a single test, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite -- -z describe.sql" + * }}} + * + * To specify a DBMS to use (the default is postgres): + * {{{ + * REF_DBMS=mysql SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + */ +// scalastyle:on line.size.limit +class CrossDbmsQueryTestSuite extends SQLQueryTestSuite with Logging { + + // Note: the below two functions have to be functions instead of variables because the superclass + // runs the test first before the subclass variables can be instantiated. + private def crossDbmsToGenerateGoldenFiles: String = { + val userInputDbms = System.getenv("REF_DBMS") + if (userInputDbms != null && userInputDbms.nonEmpty) { + assert(CrossDbmsQueryTestSuite.SUPPORTED_DBMS.contains(userInputDbms), + s"$userInputDbms is not currently supported.") + userInputDbms + } else { + CrossDbmsQueryTestSuite.DEFAULT_DBMS + } + } + private def customConnectionUrl: String = System.getenv("REF_DBMS_CONNECTION_URL") + + override protected def runQueries( + queries: Seq[String], + testCase: TestCase, + configSet: Seq[(String, String)]): Unit = { + val localSparkSession = spark.newSession() + + var runner: Option[SQLQueryTestRunner] = None + val outputs: Seq[QueryTestOutput] = queries.map { sql => + val output = { + // Use the runner when generating golden files, and Spark when running the test against + // the already generated golden files. + if (regenerateGoldenFiles) { + if (runner.isEmpty) { + val connectionUrl = if (customConnectionUrl != null && customConnectionUrl.nonEmpty) { Review Comment: you can do `Option(customConnectionUrl).map { url => ... }` ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/CrossDbmsQueryTestSuite.scala: ########## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.io.File +import java.util.Locale + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DescribeColumn, DescribeRelation, Distinct, Generate, Join, LogicalPlan, Sample, Sort} +import org.apache.spark.sql.catalyst.util.stringToFile +import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeCommandBase} +import org.apache.spark.util.Utils + +// scalastyle:off line.size.limit +/** + * See SQLQueryTestSuite.scala for more information. This class builds off of that to allow us + * to generate golden files with other DBMS to perform cross-checking for correctness. Note that the + * input directory path is currently limited because most, if not all, of our current SQL query + * tests will not be compatible with other DBMSes. There will be more work in the future, such as + * some kind of conversion, to increase coverage. + * + * If your SQL query test is not compatible with other DBMSes, please add it to the `ignoreList` at + * the bottom of this file. + * + * You need to have a database server up before running this test. + * For example, for postgres: + * 1. Install PostgreSQL. + * a. On a mac: `brew install postgresql@13` + * 2. After installing PostgreSQL, start the database server, then create a role named pg with + * superuser permissions: `createuser -s pg`` OR `psql> CREATE role pg superuser`` + * + * To run the entire test suite: + * {{{ + * build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden files for entire suite, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden file for a single test, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite -- -z describe.sql" + * }}} + * + * To specify a DBMS to use (the default is postgres): + * {{{ + * REF_DBMS=mysql SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + */ +// scalastyle:on line.size.limit +class CrossDbmsQueryTestSuite extends SQLQueryTestSuite with Logging { + + // Note: the below two functions have to be functions instead of variables because the superclass + // runs the test first before the subclass variables can be instantiated. + private def crossDbmsToGenerateGoldenFiles: String = { + val userInputDbms = System.getenv("REF_DBMS") Review Comment: can you put this magic constant into a separate `val` with a comment saying what it is? ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/CrossDbmsQueryTestSuite.scala: ########## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.io.File +import java.util.Locale + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DescribeColumn, DescribeRelation, Distinct, Generate, Join, LogicalPlan, Sample, Sort} +import org.apache.spark.sql.catalyst.util.stringToFile +import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeCommandBase} +import org.apache.spark.util.Utils + +// scalastyle:off line.size.limit +/** + * See SQLQueryTestSuite.scala for more information. This class builds off of that to allow us + * to generate golden files with other DBMS to perform cross-checking for correctness. Note that the + * input directory path is currently limited because most, if not all, of our current SQL query + * tests will not be compatible with other DBMSes. There will be more work in the future, such as + * some kind of conversion, to increase coverage. + * + * If your SQL query test is not compatible with other DBMSes, please add it to the `ignoreList` at + * the bottom of this file. + * + * You need to have a database server up before running this test. + * For example, for postgres: + * 1. Install PostgreSQL. + * a. On a mac: `brew install postgresql@13` + * 2. After installing PostgreSQL, start the database server, then create a role named pg with + * superuser permissions: `createuser -s pg`` OR `psql> CREATE role pg superuser`` + * + * To run the entire test suite: + * {{{ + * build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden files for entire suite, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden file for a single test, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite -- -z describe.sql" + * }}} + * + * To specify a DBMS to use (the default is postgres): + * {{{ + * REF_DBMS=mysql SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + */ +// scalastyle:on line.size.limit +class CrossDbmsQueryTestSuite extends SQLQueryTestSuite with Logging { + + // Note: the below two functions have to be functions instead of variables because the superclass + // runs the test first before the subclass variables can be instantiated. + private def crossDbmsToGenerateGoldenFiles: String = { + val userInputDbms = System.getenv("REF_DBMS") + if (userInputDbms != null && userInputDbms.nonEmpty) { + assert(CrossDbmsQueryTestSuite.SUPPORTED_DBMS.contains(userInputDbms), + s"$userInputDbms is not currently supported.") + userInputDbms + } else { + CrossDbmsQueryTestSuite.DEFAULT_DBMS + } + } + private def customConnectionUrl: String = System.getenv("REF_DBMS_CONNECTION_URL") + + override protected def runQueries( + queries: Seq[String], Review Comment: please indent these function params by +4 total spaces ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/CrossDbmsQueryTestSuite.scala: ########## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.io.File +import java.util.Locale + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DescribeColumn, DescribeRelation, Distinct, Generate, Join, LogicalPlan, Sample, Sort} +import org.apache.spark.sql.catalyst.util.stringToFile +import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeCommandBase} +import org.apache.spark.util.Utils + +// scalastyle:off line.size.limit +/** + * See SQLQueryTestSuite.scala for more information. This class builds off of that to allow us + * to generate golden files with other DBMS to perform cross-checking for correctness. Note that the + * input directory path is currently limited because most, if not all, of our current SQL query + * tests will not be compatible with other DBMSes. There will be more work in the future, such as + * some kind of conversion, to increase coverage. + * + * If your SQL query test is not compatible with other DBMSes, please add it to the `ignoreList` at + * the bottom of this file. + * + * You need to have a database server up before running this test. + * For example, for postgres: + * 1. Install PostgreSQL. + * a. On a mac: `brew install postgresql@13` + * 2. After installing PostgreSQL, start the database server, then create a role named pg with + * superuser permissions: `createuser -s pg`` OR `psql> CREATE role pg superuser`` + * + * To run the entire test suite: + * {{{ + * build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden files for entire suite, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden file for a single test, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite -- -z describe.sql" + * }}} + * + * To specify a DBMS to use (the default is postgres): + * {{{ + * REF_DBMS=mysql SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + */ +// scalastyle:on line.size.limit +class CrossDbmsQueryTestSuite extends SQLQueryTestSuite with Logging { + + // Note: the below two functions have to be functions instead of variables because the superclass + // runs the test first before the subclass variables can be instantiated. + private def crossDbmsToGenerateGoldenFiles: String = { + val userInputDbms = System.getenv("REF_DBMS") + if (userInputDbms != null && userInputDbms.nonEmpty) { Review Comment: you can do `Option(userInputDbms).map { dbms => ... }` ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/CrossDbmsQueryTestSuite.scala: ########## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.io.File +import java.util.Locale + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DescribeColumn, DescribeRelation, Distinct, Generate, Join, LogicalPlan, Sample, Sort} +import org.apache.spark.sql.catalyst.util.stringToFile +import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeCommandBase} +import org.apache.spark.util.Utils + +// scalastyle:off line.size.limit +/** + * See SQLQueryTestSuite.scala for more information. This class builds off of that to allow us + * to generate golden files with other DBMS to perform cross-checking for correctness. Note that the + * input directory path is currently limited because most, if not all, of our current SQL query + * tests will not be compatible with other DBMSes. There will be more work in the future, such as + * some kind of conversion, to increase coverage. + * + * If your SQL query test is not compatible with other DBMSes, please add it to the `ignoreList` at + * the bottom of this file. + * + * You need to have a database server up before running this test. + * For example, for postgres: + * 1. Install PostgreSQL. + * a. On a mac: `brew install postgresql@13` + * 2. After installing PostgreSQL, start the database server, then create a role named pg with + * superuser permissions: `createuser -s pg`` OR `psql> CREATE role pg superuser`` + * + * To run the entire test suite: + * {{{ + * build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden files for entire suite, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden file for a single test, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite -- -z describe.sql" + * }}} + * + * To specify a DBMS to use (the default is postgres): + * {{{ + * REF_DBMS=mysql SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + */ +// scalastyle:on line.size.limit +class CrossDbmsQueryTestSuite extends SQLQueryTestSuite with Logging { + + // Note: the below two functions have to be functions instead of variables because the superclass + // runs the test first before the subclass variables can be instantiated. + private def crossDbmsToGenerateGoldenFiles: String = { + val userInputDbms = System.getenv("REF_DBMS") + if (userInputDbms != null && userInputDbms.nonEmpty) { + assert(CrossDbmsQueryTestSuite.SUPPORTED_DBMS.contains(userInputDbms), + s"$userInputDbms is not currently supported.") + userInputDbms + } else { + CrossDbmsQueryTestSuite.DEFAULT_DBMS + } + } + private def customConnectionUrl: String = System.getenv("REF_DBMS_CONNECTION_URL") Review Comment: can you put this magic constant into a separate val with a comment saying what it is? ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/CrossDbmsQueryTestSuite.scala: ########## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.io.File +import java.util.Locale + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DescribeColumn, DescribeRelation, Distinct, Generate, Join, LogicalPlan, Sample, Sort} +import org.apache.spark.sql.catalyst.util.stringToFile +import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeCommandBase} +import org.apache.spark.util.Utils + +// scalastyle:off line.size.limit +/** + * See SQLQueryTestSuite.scala for more information. This class builds off of that to allow us + * to generate golden files with other DBMS to perform cross-checking for correctness. Note that the + * input directory path is currently limited because most, if not all, of our current SQL query + * tests will not be compatible with other DBMSes. There will be more work in the future, such as + * some kind of conversion, to increase coverage. + * + * If your SQL query test is not compatible with other DBMSes, please add it to the `ignoreList` at + * the bottom of this file. + * + * You need to have a database server up before running this test. + * For example, for postgres: + * 1. Install PostgreSQL. + * a. On a mac: `brew install postgresql@13` + * 2. After installing PostgreSQL, start the database server, then create a role named pg with + * superuser permissions: `createuser -s pg`` OR `psql> CREATE role pg superuser`` + * + * To run the entire test suite: + * {{{ + * build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden files for entire suite, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden file for a single test, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite -- -z describe.sql" + * }}} + * + * To specify a DBMS to use (the default is postgres): + * {{{ + * REF_DBMS=mysql SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + */ +// scalastyle:on line.size.limit +class CrossDbmsQueryTestSuite extends SQLQueryTestSuite with Logging { + + // Note: the below two functions have to be functions instead of variables because the superclass + // runs the test first before the subclass variables can be instantiated. + private def crossDbmsToGenerateGoldenFiles: String = { + val userInputDbms = System.getenv("REF_DBMS") + if (userInputDbms != null && userInputDbms.nonEmpty) { + assert(CrossDbmsQueryTestSuite.SUPPORTED_DBMS.contains(userInputDbms), + s"$userInputDbms is not currently supported.") + userInputDbms + } else { + CrossDbmsQueryTestSuite.DEFAULT_DBMS + } + } + private def customConnectionUrl: String = System.getenv("REF_DBMS_CONNECTION_URL") + + override protected def runQueries( + queries: Seq[String], + testCase: TestCase, + configSet: Seq[(String, String)]): Unit = { + val localSparkSession = spark.newSession() + + var runner: Option[SQLQueryTestRunner] = None + val outputs: Seq[QueryTestOutput] = queries.map { sql => + val output = { + // Use the runner when generating golden files, and Spark when running the test against + // the already generated golden files. + if (regenerateGoldenFiles) { + if (runner.isEmpty) { + val connectionUrl = if (customConnectionUrl != null && customConnectionUrl.nonEmpty) { + Some(customConnectionUrl) + } else { + None + } + runner = Some(CrossDbmsQueryTestSuite.DBMS_TO_CONNECTION_MAPPING( + crossDbmsToGenerateGoldenFiles)(connectionUrl)) + } + val sparkDf = spark.sql(sql) + val output = runner.map(_.runQuery(sql)).get + // Use Spark analyzed plan to check if the query result is already semantically sorted + val result = if (isSemanticallySorted(sparkDf.queryExecution.analyzed)) { + output + } else { + // Sort the answer manually if it isn't sorted. + output.sorted + } + result + } else { + handleExceptions(getNormalizedQueryExecutionResult(localSparkSession, sql))._2 Review Comment: please split this into separate statements to clarify? ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/CrossDbmsQueryTestSuite.scala: ########## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.io.File +import java.util.Locale + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DescribeColumn, DescribeRelation, Distinct, Generate, Join, LogicalPlan, Sample, Sort} +import org.apache.spark.sql.catalyst.util.stringToFile +import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeCommandBase} +import org.apache.spark.util.Utils + +// scalastyle:off line.size.limit +/** + * See SQLQueryTestSuite.scala for more information. This class builds off of that to allow us + * to generate golden files with other DBMS to perform cross-checking for correctness. Note that the + * input directory path is currently limited because most, if not all, of our current SQL query + * tests will not be compatible with other DBMSes. There will be more work in the future, such as + * some kind of conversion, to increase coverage. + * + * If your SQL query test is not compatible with other DBMSes, please add it to the `ignoreList` at + * the bottom of this file. + * + * You need to have a database server up before running this test. + * For example, for postgres: + * 1. Install PostgreSQL. + * a. On a mac: `brew install postgresql@13` + * 2. After installing PostgreSQL, start the database server, then create a role named pg with + * superuser permissions: `createuser -s pg`` OR `psql> CREATE role pg superuser`` + * + * To run the entire test suite: + * {{{ + * build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden files for entire suite, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden file for a single test, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite -- -z describe.sql" + * }}} + * + * To specify a DBMS to use (the default is postgres): + * {{{ + * REF_DBMS=mysql SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + */ +// scalastyle:on line.size.limit +class CrossDbmsQueryTestSuite extends SQLQueryTestSuite with Logging { + + // Note: the below two functions have to be functions instead of variables because the superclass + // runs the test first before the subclass variables can be instantiated. + private def crossDbmsToGenerateGoldenFiles: String = { + val userInputDbms = System.getenv("REF_DBMS") + if (userInputDbms != null && userInputDbms.nonEmpty) { + assert(CrossDbmsQueryTestSuite.SUPPORTED_DBMS.contains(userInputDbms), + s"$userInputDbms is not currently supported.") + userInputDbms + } else { + CrossDbmsQueryTestSuite.DEFAULT_DBMS + } + } + private def customConnectionUrl: String = System.getenv("REF_DBMS_CONNECTION_URL") + + override protected def runQueries( + queries: Seq[String], + testCase: TestCase, + configSet: Seq[(String, String)]): Unit = { + val localSparkSession = spark.newSession() + + var runner: Option[SQLQueryTestRunner] = None + val outputs: Seq[QueryTestOutput] = queries.map { sql => + val output = { + // Use the runner when generating golden files, and Spark when running the test against + // the already generated golden files. + if (regenerateGoldenFiles) { + if (runner.isEmpty) { + val connectionUrl = if (customConnectionUrl != null && customConnectionUrl.nonEmpty) { + Some(customConnectionUrl) + } else { + None + } + runner = Some(CrossDbmsQueryTestSuite.DBMS_TO_CONNECTION_MAPPING( + crossDbmsToGenerateGoldenFiles)(connectionUrl)) + } + val sparkDf = spark.sql(sql) + val output = runner.map(_.runQuery(sql)).get + // Use Spark analyzed plan to check if the query result is already semantically sorted + val result = if (isSemanticallySorted(sparkDf.queryExecution.analyzed)) { + output + } else { + // Sort the answer manually if it isn't sorted. + output.sorted + } + result + } else { + handleExceptions(getNormalizedQueryExecutionResult(localSparkSession, sql))._2 + } + } + // We do some query canonicalization now. + val executionOutput = ExecutionOutput( + sql = sql, + // Don't care about the schema for this test. Only care about correctness. + schema = None, + output = normalizeTestResults(output.mkString("\n"))) + if (testCase.isInstanceOf[CTETest]) { + expandCTEQueryAndCompareResult(localSparkSession, sql, executionOutput) + } + executionOutput + } + runner.foreach(_.cleanUp()) + + if (regenerateGoldenFiles) { + val goldenOutput = { + s"-- Automatically generated by ${getClass.getSimpleName}\n" + + outputs.mkString("\n\n\n") + "\n" + } + val resultFile = new File(testCase.resultFile) + val parent = resultFile.getParentFile + if (!parent.exists()) { + assert(parent.mkdirs(), "Could not create directory: " + parent) + } + stringToFile(resultFile, goldenOutput) + } + + readGoldenFileAndCompareResults(testCase.resultFile, outputs, ExecutionOutput) + } + + override def createScalaTestCase(testCase: TestCase): Unit = { + if (ignoreList.exists(t => + testCase.name.toLowerCase(Locale.ROOT).contains(t.toLowerCase(Locale.ROOT)))) { + ignore(testCase.name) { + /* Do nothing */ + } + } else { + testCase match { + case _: RegularTestCase => + // Create a test case to run this case. + test(testCase.name) { + runSqlTestCase(testCase, listTestCases) + } + case _ => + ignore(s"Ignoring test cases that are not [[RegularTestCase]] for now") { + /* Do nothing */ Review Comment: Same, let's log it? ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/CrossDbmsQueryTestSuite.scala: ########## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.io.File +import java.util.Locale + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DescribeColumn, DescribeRelation, Distinct, Generate, Join, LogicalPlan, Sample, Sort} +import org.apache.spark.sql.catalyst.util.stringToFile +import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeCommandBase} +import org.apache.spark.util.Utils + +// scalastyle:off line.size.limit +/** + * See SQLQueryTestSuite.scala for more information. This class builds off of that to allow us + * to generate golden files with other DBMS to perform cross-checking for correctness. Note that the + * input directory path is currently limited because most, if not all, of our current SQL query + * tests will not be compatible with other DBMSes. There will be more work in the future, such as + * some kind of conversion, to increase coverage. + * + * If your SQL query test is not compatible with other DBMSes, please add it to the `ignoreList` at + * the bottom of this file. + * + * You need to have a database server up before running this test. + * For example, for postgres: + * 1. Install PostgreSQL. + * a. On a mac: `brew install postgresql@13` + * 2. After installing PostgreSQL, start the database server, then create a role named pg with + * superuser permissions: `createuser -s pg`` OR `psql> CREATE role pg superuser`` + * + * To run the entire test suite: + * {{{ + * build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden files for entire suite, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden file for a single test, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite -- -z describe.sql" + * }}} + * + * To specify a DBMS to use (the default is postgres): + * {{{ + * REF_DBMS=mysql SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + */ +// scalastyle:on line.size.limit +class CrossDbmsQueryTestSuite extends SQLQueryTestSuite with Logging { + + // Note: the below two functions have to be functions instead of variables because the superclass + // runs the test first before the subclass variables can be instantiated. + private def crossDbmsToGenerateGoldenFiles: String = { + val userInputDbms = System.getenv("REF_DBMS") + if (userInputDbms != null && userInputDbms.nonEmpty) { + assert(CrossDbmsQueryTestSuite.SUPPORTED_DBMS.contains(userInputDbms), + s"$userInputDbms is not currently supported.") + userInputDbms + } else { + CrossDbmsQueryTestSuite.DEFAULT_DBMS + } + } + private def customConnectionUrl: String = System.getenv("REF_DBMS_CONNECTION_URL") + + override protected def runQueries( + queries: Seq[String], + testCase: TestCase, + configSet: Seq[(String, String)]): Unit = { + val localSparkSession = spark.newSession() + + var runner: Option[SQLQueryTestRunner] = None + val outputs: Seq[QueryTestOutput] = queries.map { sql => + val output = { + // Use the runner when generating golden files, and Spark when running the test against + // the already generated golden files. + if (regenerateGoldenFiles) { + if (runner.isEmpty) { + val connectionUrl = if (customConnectionUrl != null && customConnectionUrl.nonEmpty) { + Some(customConnectionUrl) + } else { + None + } + runner = Some(CrossDbmsQueryTestSuite.DBMS_TO_CONNECTION_MAPPING( + crossDbmsToGenerateGoldenFiles)(connectionUrl)) + } + val sparkDf = spark.sql(sql) + val output = runner.map(_.runQuery(sql)).get + // Use Spark analyzed plan to check if the query result is already semantically sorted + val result = if (isSemanticallySorted(sparkDf.queryExecution.analyzed)) { + output + } else { + // Sort the answer manually if it isn't sorted. + output.sorted + } + result + } else { + handleExceptions(getNormalizedQueryExecutionResult(localSparkSession, sql))._2 + } + } + // We do some query canonicalization now. + val executionOutput = ExecutionOutput( + sql = sql, + // Don't care about the schema for this test. Only care about correctness. + schema = None, + output = normalizeTestResults(output.mkString("\n"))) + if (testCase.isInstanceOf[CTETest]) { Review Comment: please add a comment here describing why we care about CTE tests? ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/CrossDbmsQueryTestSuite.scala: ########## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.io.File +import java.util.Locale + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DescribeColumn, DescribeRelation, Distinct, Generate, Join, LogicalPlan, Sample, Sort} +import org.apache.spark.sql.catalyst.util.stringToFile +import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeCommandBase} +import org.apache.spark.util.Utils + +// scalastyle:off line.size.limit +/** + * See SQLQueryTestSuite.scala for more information. This class builds off of that to allow us + * to generate golden files with other DBMS to perform cross-checking for correctness. Note that the + * input directory path is currently limited because most, if not all, of our current SQL query + * tests will not be compatible with other DBMSes. There will be more work in the future, such as + * some kind of conversion, to increase coverage. + * + * If your SQL query test is not compatible with other DBMSes, please add it to the `ignoreList` at + * the bottom of this file. + * + * You need to have a database server up before running this test. + * For example, for postgres: + * 1. Install PostgreSQL. + * a. On a mac: `brew install postgresql@13` + * 2. After installing PostgreSQL, start the database server, then create a role named pg with + * superuser permissions: `createuser -s pg`` OR `psql> CREATE role pg superuser`` + * + * To run the entire test suite: + * {{{ + * build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden files for entire suite, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden file for a single test, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite -- -z describe.sql" + * }}} + * + * To specify a DBMS to use (the default is postgres): + * {{{ + * REF_DBMS=mysql SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + */ +// scalastyle:on line.size.limit +class CrossDbmsQueryTestSuite extends SQLQueryTestSuite with Logging { + + // Note: the below two functions have to be functions instead of variables because the superclass + // runs the test first before the subclass variables can be instantiated. + private def crossDbmsToGenerateGoldenFiles: String = { + val userInputDbms = System.getenv("REF_DBMS") + if (userInputDbms != null && userInputDbms.nonEmpty) { + assert(CrossDbmsQueryTestSuite.SUPPORTED_DBMS.contains(userInputDbms), + s"$userInputDbms is not currently supported.") + userInputDbms + } else { + CrossDbmsQueryTestSuite.DEFAULT_DBMS + } + } + private def customConnectionUrl: String = System.getenv("REF_DBMS_CONNECTION_URL") + + override protected def runQueries( + queries: Seq[String], + testCase: TestCase, + configSet: Seq[(String, String)]): Unit = { + val localSparkSession = spark.newSession() + + var runner: Option[SQLQueryTestRunner] = None + val outputs: Seq[QueryTestOutput] = queries.map { sql => + val output = { + // Use the runner when generating golden files, and Spark when running the test against + // the already generated golden files. + if (regenerateGoldenFiles) { + if (runner.isEmpty) { + val connectionUrl = if (customConnectionUrl != null && customConnectionUrl.nonEmpty) { + Some(customConnectionUrl) + } else { + None + } + runner = Some(CrossDbmsQueryTestSuite.DBMS_TO_CONNECTION_MAPPING( + crossDbmsToGenerateGoldenFiles)(connectionUrl)) + } + val sparkDf = spark.sql(sql) + val output = runner.map(_.runQuery(sql)).get + // Use Spark analyzed plan to check if the query result is already semantically sorted + val result = if (isSemanticallySorted(sparkDf.queryExecution.analyzed)) { + output + } else { + // Sort the answer manually if it isn't sorted. + output.sorted + } + result + } else { + handleExceptions(getNormalizedQueryExecutionResult(localSparkSession, sql))._2 + } + } + // We do some query canonicalization now. + val executionOutput = ExecutionOutput( + sql = sql, + // Don't care about the schema for this test. Only care about correctness. + schema = None, + output = normalizeTestResults(output.mkString("\n"))) + if (testCase.isInstanceOf[CTETest]) { + expandCTEQueryAndCompareResult(localSparkSession, sql, executionOutput) + } + executionOutput + } + runner.foreach(_.cleanUp()) + + if (regenerateGoldenFiles) { + val goldenOutput = { + s"-- Automatically generated by ${getClass.getSimpleName}\n" + + outputs.mkString("\n\n\n") + "\n" + } + val resultFile = new File(testCase.resultFile) + val parent = resultFile.getParentFile + if (!parent.exists()) { + assert(parent.mkdirs(), "Could not create directory: " + parent) + } + stringToFile(resultFile, goldenOutput) + } + + readGoldenFileAndCompareResults(testCase.resultFile, outputs, ExecutionOutput) + } + + override def createScalaTestCase(testCase: TestCase): Unit = { + if (ignoreList.exists(t => Review Comment: optional: you could just add this to the `testCase match` block below as the first case, to simplify the logic. ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/CrossDbmsQueryTestSuite.scala: ########## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.io.File +import java.util.Locale + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DescribeColumn, DescribeRelation, Distinct, Generate, Join, LogicalPlan, Sample, Sort} +import org.apache.spark.sql.catalyst.util.stringToFile +import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeCommandBase} +import org.apache.spark.util.Utils + +// scalastyle:off line.size.limit +/** + * See SQLQueryTestSuite.scala for more information. This class builds off of that to allow us + * to generate golden files with other DBMS to perform cross-checking for correctness. Note that the + * input directory path is currently limited because most, if not all, of our current SQL query + * tests will not be compatible with other DBMSes. There will be more work in the future, such as + * some kind of conversion, to increase coverage. + * + * If your SQL query test is not compatible with other DBMSes, please add it to the `ignoreList` at + * the bottom of this file. + * + * You need to have a database server up before running this test. + * For example, for postgres: + * 1. Install PostgreSQL. + * a. On a mac: `brew install postgresql@13` + * 2. After installing PostgreSQL, start the database server, then create a role named pg with + * superuser permissions: `createuser -s pg`` OR `psql> CREATE role pg superuser`` + * + * To run the entire test suite: + * {{{ + * build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden files for entire suite, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden file for a single test, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite -- -z describe.sql" + * }}} + * + * To specify a DBMS to use (the default is postgres): + * {{{ + * REF_DBMS=mysql SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + */ +// scalastyle:on line.size.limit +class CrossDbmsQueryTestSuite extends SQLQueryTestSuite with Logging { + + // Note: the below two functions have to be functions instead of variables because the superclass + // runs the test first before the subclass variables can be instantiated. + private def crossDbmsToGenerateGoldenFiles: String = { + val userInputDbms = System.getenv("REF_DBMS") + if (userInputDbms != null && userInputDbms.nonEmpty) { + assert(CrossDbmsQueryTestSuite.SUPPORTED_DBMS.contains(userInputDbms), + s"$userInputDbms is not currently supported.") + userInputDbms + } else { + CrossDbmsQueryTestSuite.DEFAULT_DBMS + } + } + private def customConnectionUrl: String = System.getenv("REF_DBMS_CONNECTION_URL") + + override protected def runQueries( + queries: Seq[String], + testCase: TestCase, + configSet: Seq[(String, String)]): Unit = { + val localSparkSession = spark.newSession() + + var runner: Option[SQLQueryTestRunner] = None + val outputs: Seq[QueryTestOutput] = queries.map { sql => + val output = { + // Use the runner when generating golden files, and Spark when running the test against + // the already generated golden files. + if (regenerateGoldenFiles) { + if (runner.isEmpty) { + val connectionUrl = if (customConnectionUrl != null && customConnectionUrl.nonEmpty) { + Some(customConnectionUrl) + } else { + None + } + runner = Some(CrossDbmsQueryTestSuite.DBMS_TO_CONNECTION_MAPPING( + crossDbmsToGenerateGoldenFiles)(connectionUrl)) + } + val sparkDf = spark.sql(sql) + val output = runner.map(_.runQuery(sql)).get + // Use Spark analyzed plan to check if the query result is already semantically sorted Review Comment: please update the comments to be complete sentences ending with punctuation (same everywhere else in this PR) ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/CrossDbmsQueryTestSuite.scala: ########## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.io.File +import java.util.Locale + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DescribeColumn, DescribeRelation, Distinct, Generate, Join, LogicalPlan, Sample, Sort} +import org.apache.spark.sql.catalyst.util.stringToFile +import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeCommandBase} +import org.apache.spark.util.Utils + +// scalastyle:off line.size.limit +/** + * See SQLQueryTestSuite.scala for more information. This class builds off of that to allow us + * to generate golden files with other DBMS to perform cross-checking for correctness. Note that the + * input directory path is currently limited because most, if not all, of our current SQL query + * tests will not be compatible with other DBMSes. There will be more work in the future, such as + * some kind of conversion, to increase coverage. + * + * If your SQL query test is not compatible with other DBMSes, please add it to the `ignoreList` at + * the bottom of this file. + * + * You need to have a database server up before running this test. + * For example, for postgres: + * 1. Install PostgreSQL. + * a. On a mac: `brew install postgresql@13` + * 2. After installing PostgreSQL, start the database server, then create a role named pg with + * superuser permissions: `createuser -s pg`` OR `psql> CREATE role pg superuser`` + * + * To run the entire test suite: + * {{{ + * build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden files for entire suite, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden file for a single test, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite -- -z describe.sql" + * }}} + * + * To specify a DBMS to use (the default is postgres): + * {{{ + * REF_DBMS=mysql SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + */ +// scalastyle:on line.size.limit +class CrossDbmsQueryTestSuite extends SQLQueryTestSuite with Logging { + + // Note: the below two functions have to be functions instead of variables because the superclass + // runs the test first before the subclass variables can be instantiated. + private def crossDbmsToGenerateGoldenFiles: String = { + val userInputDbms = System.getenv("REF_DBMS") + if (userInputDbms != null && userInputDbms.nonEmpty) { + assert(CrossDbmsQueryTestSuite.SUPPORTED_DBMS.contains(userInputDbms), + s"$userInputDbms is not currently supported.") + userInputDbms + } else { + CrossDbmsQueryTestSuite.DEFAULT_DBMS + } + } + private def customConnectionUrl: String = System.getenv("REF_DBMS_CONNECTION_URL") + + override protected def runQueries( + queries: Seq[String], + testCase: TestCase, + configSet: Seq[(String, String)]): Unit = { + val localSparkSession = spark.newSession() + + var runner: Option[SQLQueryTestRunner] = None + val outputs: Seq[QueryTestOutput] = queries.map { sql => + val output = { + // Use the runner when generating golden files, and Spark when running the test against + // the already generated golden files. + if (regenerateGoldenFiles) { + if (runner.isEmpty) { + val connectionUrl = if (customConnectionUrl != null && customConnectionUrl.nonEmpty) { + Some(customConnectionUrl) + } else { + None + } + runner = Some(CrossDbmsQueryTestSuite.DBMS_TO_CONNECTION_MAPPING( + crossDbmsToGenerateGoldenFiles)(connectionUrl)) + } + val sparkDf = spark.sql(sql) + val output = runner.map(_.runQuery(sql)).get + // Use Spark analyzed plan to check if the query result is already semantically sorted + val result = if (isSemanticallySorted(sparkDf.queryExecution.analyzed)) { + output + } else { + // Sort the answer manually if it isn't sorted. + output.sorted + } + result + } else { + handleExceptions(getNormalizedQueryExecutionResult(localSparkSession, sql))._2 + } + } + // We do some query canonicalization now. + val executionOutput = ExecutionOutput( + sql = sql, + // Don't care about the schema for this test. Only care about correctness. + schema = None, + output = normalizeTestResults(output.mkString("\n"))) + if (testCase.isInstanceOf[CTETest]) { + expandCTEQueryAndCompareResult(localSparkSession, sql, executionOutput) + } + executionOutput + } + runner.foreach(_.cleanUp()) + + if (regenerateGoldenFiles) { + val goldenOutput = { + s"-- Automatically generated by ${getClass.getSimpleName}\n" + + outputs.mkString("\n\n\n") + "\n" + } + val resultFile = new File(testCase.resultFile) + val parent = resultFile.getParentFile + if (!parent.exists()) { + assert(parent.mkdirs(), "Could not create directory: " + parent) + } + stringToFile(resultFile, goldenOutput) + } + + readGoldenFileAndCompareResults(testCase.resultFile, outputs, ExecutionOutput) + } + + override def createScalaTestCase(testCase: TestCase): Unit = { + if (ignoreList.exists(t => + testCase.name.toLowerCase(Locale.ROOT).contains(t.toLowerCase(Locale.ROOT)))) { + ignore(testCase.name) { + /* Do nothing */ + } + } else { + testCase match { + case _: RegularTestCase => + // Create a test case to run this case. + test(testCase.name) { + runSqlTestCase(testCase, listTestCases) + } + case _ => + ignore(s"Ignoring test cases that are not [[RegularTestCase]] for now") { + /* Do nothing */ + } + } + } + } + + override protected def resultFileForInputFile(file: File): String = { + val defaultResultsDir = new File(baseResourcePath, "results") + val goldenFilePath = new File( + defaultResultsDir, s"$crossDbmsToGenerateGoldenFiles-results").getAbsolutePath + file.getAbsolutePath.replace(inputFilePath, goldenFilePath) + ".out" + } + + override lazy val listTestCases: Seq[TestCase] = { + listFilesRecursively(new File(inputFilePath)).flatMap { file => + var resultFile = resultFileForInputFile(file) + // JDK-4511638 changes 'toString' result of Float/Double Review Comment: is this code copied from somewhere? if so, please deduplicate it into one shared helper? ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/CrossDbmsQueryTestSuite.scala: ########## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.io.File +import java.util.Locale + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DescribeColumn, DescribeRelation, Distinct, Generate, Join, LogicalPlan, Sample, Sort} +import org.apache.spark.sql.catalyst.util.stringToFile +import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeCommandBase} +import org.apache.spark.util.Utils + +// scalastyle:off line.size.limit +/** + * See SQLQueryTestSuite.scala for more information. This class builds off of that to allow us + * to generate golden files with other DBMS to perform cross-checking for correctness. Note that the + * input directory path is currently limited because most, if not all, of our current SQL query + * tests will not be compatible with other DBMSes. There will be more work in the future, such as + * some kind of conversion, to increase coverage. + * + * If your SQL query test is not compatible with other DBMSes, please add it to the `ignoreList` at + * the bottom of this file. + * + * You need to have a database server up before running this test. + * For example, for postgres: + * 1. Install PostgreSQL. + * a. On a mac: `brew install postgresql@13` + * 2. After installing PostgreSQL, start the database server, then create a role named pg with + * superuser permissions: `createuser -s pg`` OR `psql> CREATE role pg superuser`` + * + * To run the entire test suite: + * {{{ + * build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden files for entire suite, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden file for a single test, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite -- -z describe.sql" + * }}} + * + * To specify a DBMS to use (the default is postgres): + * {{{ + * REF_DBMS=mysql SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + */ +// scalastyle:on line.size.limit +class CrossDbmsQueryTestSuite extends SQLQueryTestSuite with Logging { + + // Note: the below two functions have to be functions instead of variables because the superclass + // runs the test first before the subclass variables can be instantiated. + private def crossDbmsToGenerateGoldenFiles: String = { + val userInputDbms = System.getenv("REF_DBMS") + if (userInputDbms != null && userInputDbms.nonEmpty) { + assert(CrossDbmsQueryTestSuite.SUPPORTED_DBMS.contains(userInputDbms), + s"$userInputDbms is not currently supported.") + userInputDbms + } else { + CrossDbmsQueryTestSuite.DEFAULT_DBMS + } + } + private def customConnectionUrl: String = System.getenv("REF_DBMS_CONNECTION_URL") + + override protected def runQueries( + queries: Seq[String], + testCase: TestCase, + configSet: Seq[(String, String)]): Unit = { + val localSparkSession = spark.newSession() + + var runner: Option[SQLQueryTestRunner] = None + val outputs: Seq[QueryTestOutput] = queries.map { sql => + val output = { + // Use the runner when generating golden files, and Spark when running the test against + // the already generated golden files. + if (regenerateGoldenFiles) { + if (runner.isEmpty) { + val connectionUrl = if (customConnectionUrl != null && customConnectionUrl.nonEmpty) { + Some(customConnectionUrl) + } else { + None + } + runner = Some(CrossDbmsQueryTestSuite.DBMS_TO_CONNECTION_MAPPING( + crossDbmsToGenerateGoldenFiles)(connectionUrl)) + } + val sparkDf = spark.sql(sql) + val output = runner.map(_.runQuery(sql)).get + // Use Spark analyzed plan to check if the query result is already semantically sorted + val result = if (isSemanticallySorted(sparkDf.queryExecution.analyzed)) { + output + } else { + // Sort the answer manually if it isn't sorted. + output.sorted + } + result + } else { + handleExceptions(getNormalizedQueryExecutionResult(localSparkSession, sql))._2 + } + } + // We do some query canonicalization now. + val executionOutput = ExecutionOutput( + sql = sql, + // Don't care about the schema for this test. Only care about correctness. + schema = None, + output = normalizeTestResults(output.mkString("\n"))) + if (testCase.isInstanceOf[CTETest]) { + expandCTEQueryAndCompareResult(localSparkSession, sql, executionOutput) + } + executionOutput + } + runner.foreach(_.cleanUp()) + + if (regenerateGoldenFiles) { + val goldenOutput = { + s"-- Automatically generated by ${getClass.getSimpleName}\n" + + outputs.mkString("\n\n\n") + "\n" + } + val resultFile = new File(testCase.resultFile) + val parent = resultFile.getParentFile + if (!parent.exists()) { + assert(parent.mkdirs(), "Could not create directory: " + parent) + } + stringToFile(resultFile, goldenOutput) + } + + readGoldenFileAndCompareResults(testCase.resultFile, outputs, ExecutionOutput) + } + + override def createScalaTestCase(testCase: TestCase): Unit = { + if (ignoreList.exists(t => + testCase.name.toLowerCase(Locale.ROOT).contains(t.toLowerCase(Locale.ROOT)))) { + ignore(testCase.name) { + /* Do nothing */ + } + } else { + testCase match { + case _: RegularTestCase => + // Create a test case to run this case. + test(testCase.name) { + runSqlTestCase(testCase, listTestCases) + } + case _ => + ignore(s"Ignoring test cases that are not [[RegularTestCase]] for now") { + /* Do nothing */ + } + } + } + } + + override protected def resultFileForInputFile(file: File): String = { + val defaultResultsDir = new File(baseResourcePath, "results") + val goldenFilePath = new File( + defaultResultsDir, s"$crossDbmsToGenerateGoldenFiles-results").getAbsolutePath + file.getAbsolutePath.replace(inputFilePath, goldenFilePath) + ".out" + } + + override lazy val listTestCases: Seq[TestCase] = { + listFilesRecursively(new File(inputFilePath)).flatMap { file => + var resultFile = resultFileForInputFile(file) + // JDK-4511638 changes 'toString' result of Float/Double + // JDK-8282081 changes DataTimeFormatter 'F' symbol + if (Utils.isJavaVersionAtLeast21) { + if (new File(resultFile + ".java21").exists()) resultFile += ".java21" + } + val absPath = file.getAbsolutePath + val testCaseName = absPath.stripPrefix(inputFilePath).stripPrefix(File.separator) + RegularTestCase(testCaseName, absPath, resultFile) :: Nil + }.sortBy(_.name) + } + + private def isSemanticallySorted(plan: LogicalPlan): Boolean = plan match { Review Comment: please add a comment. Are you sure this logic doesn't already exist somewhere? Please look in SQLQueryTestSuite? It seems if we want to do this here, we probably want to do this for all the other golden file testing as well. (I agree with the idea, it makes test cases more deterministic.) ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/CrossDbmsQueryTestSuite.scala: ########## @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.io.File +import java.util.Locale + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DescribeColumn, DescribeRelation, Distinct, Generate, Join, LogicalPlan, Sample, Sort} +import org.apache.spark.sql.catalyst.util.stringToFile +import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeCommandBase} +import org.apache.spark.util.Utils + +// scalastyle:off line.size.limit +/** + * See SQLQueryTestSuite.scala for more information. This class builds off of that to allow us + * to generate golden files with other DBMS to perform cross-checking for correctness. Note that the + * input directory path is currently limited because most, if not all, of our current SQL query + * tests will not be compatible with other DBMSes. There will be more work in the future, such as + * some kind of conversion, to increase coverage. + * + * If your SQL query test is not compatible with other DBMSes, please add it to the `ignoreList` at + * the bottom of this file. + * + * You need to have a database server up before running this test. + * For example, for postgres: + * 1. Install PostgreSQL. + * a. On a mac: `brew install postgresql@13` + * 2. After installing PostgreSQL, start the database server, then create a role named pg with + * superuser permissions: `createuser -s pg`` OR `psql> CREATE role pg superuser`` + * + * To run the entire test suite: + * {{{ + * build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden files for entire suite, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + * + * To re-generate golden file for a single test, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite -- -z describe.sql" + * }}} + * + * To specify a DBMS to use (the default is postgres): + * {{{ + * REF_DBMS=mysql SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.CrossDbmsQueryTestSuite" + * }}} + */ +// scalastyle:on line.size.limit +class CrossDbmsQueryTestSuite extends SQLQueryTestSuite with Logging { + + // Note: the below two functions have to be functions instead of variables because the superclass + // runs the test first before the subclass variables can be instantiated. + private def crossDbmsToGenerateGoldenFiles: String = { + val userInputDbms = System.getenv("REF_DBMS") + if (userInputDbms != null && userInputDbms.nonEmpty) { + assert(CrossDbmsQueryTestSuite.SUPPORTED_DBMS.contains(userInputDbms), + s"$userInputDbms is not currently supported.") + userInputDbms + } else { + CrossDbmsQueryTestSuite.DEFAULT_DBMS + } + } + private def customConnectionUrl: String = System.getenv("REF_DBMS_CONNECTION_URL") + + override protected def runQueries( + queries: Seq[String], + testCase: TestCase, + configSet: Seq[(String, String)]): Unit = { + val localSparkSession = spark.newSession() + + var runner: Option[SQLQueryTestRunner] = None + val outputs: Seq[QueryTestOutput] = queries.map { sql => + val output = { + // Use the runner when generating golden files, and Spark when running the test against + // the already generated golden files. + if (regenerateGoldenFiles) { + if (runner.isEmpty) { + val connectionUrl = if (customConnectionUrl != null && customConnectionUrl.nonEmpty) { + Some(customConnectionUrl) + } else { + None + } + runner = Some(CrossDbmsQueryTestSuite.DBMS_TO_CONNECTION_MAPPING( + crossDbmsToGenerateGoldenFiles)(connectionUrl)) + } + val sparkDf = spark.sql(sql) + val output = runner.map(_.runQuery(sql)).get + // Use Spark analyzed plan to check if the query result is already semantically sorted + val result = if (isSemanticallySorted(sparkDf.queryExecution.analyzed)) { + output + } else { + // Sort the answer manually if it isn't sorted. + output.sorted + } + result + } else { + handleExceptions(getNormalizedQueryExecutionResult(localSparkSession, sql))._2 + } + } + // We do some query canonicalization now. + val executionOutput = ExecutionOutput( + sql = sql, + // Don't care about the schema for this test. Only care about correctness. + schema = None, + output = normalizeTestResults(output.mkString("\n"))) + if (testCase.isInstanceOf[CTETest]) { + expandCTEQueryAndCompareResult(localSparkSession, sql, executionOutput) + } + executionOutput + } + runner.foreach(_.cleanUp()) + + if (regenerateGoldenFiles) { + val goldenOutput = { + s"-- Automatically generated by ${getClass.getSimpleName}\n" + + outputs.mkString("\n\n\n") + "\n" + } + val resultFile = new File(testCase.resultFile) + val parent = resultFile.getParentFile + if (!parent.exists()) { + assert(parent.mkdirs(), "Could not create directory: " + parent) + } + stringToFile(resultFile, goldenOutput) + } + + readGoldenFileAndCompareResults(testCase.resultFile, outputs, ExecutionOutput) + } + + override def createScalaTestCase(testCase: TestCase): Unit = { + if (ignoreList.exists(t => + testCase.name.toLowerCase(Locale.ROOT).contains(t.toLowerCase(Locale.ROOT)))) { + ignore(testCase.name) { + /* Do nothing */ + } + } else { + testCase match { + case _: RegularTestCase => + // Create a test case to run this case. + test(testCase.name) { + runSqlTestCase(testCase, listTestCases) + } + case _ => + ignore(s"Ignoring test cases that are not [[RegularTestCase]] for now") { + /* Do nothing */ + } + } + } + } + + override protected def resultFileForInputFile(file: File): String = { + val defaultResultsDir = new File(baseResourcePath, "results") + val goldenFilePath = new File( + defaultResultsDir, s"$crossDbmsToGenerateGoldenFiles-results").getAbsolutePath + file.getAbsolutePath.replace(inputFilePath, goldenFilePath) + ".out" + } + + override lazy val listTestCases: Seq[TestCase] = { + listFilesRecursively(new File(inputFilePath)).flatMap { file => + var resultFile = resultFileForInputFile(file) + // JDK-4511638 changes 'toString' result of Float/Double + // JDK-8282081 changes DataTimeFormatter 'F' symbol + if (Utils.isJavaVersionAtLeast21) { + if (new File(resultFile + ".java21").exists()) resultFile += ".java21" + } + val absPath = file.getAbsolutePath + val testCaseName = absPath.stripPrefix(inputFilePath).stripPrefix(File.separator) + RegularTestCase(testCaseName, absPath, resultFile) :: Nil + }.sortBy(_.name) + } + + private def isSemanticallySorted(plan: LogicalPlan): Boolean = plan match { + case _: Join | _: Aggregate | _: Generate | _: Sample | _: Distinct => false + case _: DescribeCommandBase + | _: DescribeColumnCommand + | _: DescribeRelation + | _: DescribeColumn => true + case PhysicalOperation(_, _, Sort(_, true, _)) => true + case _ => plan.children.iterator.exists(isSemanticallySorted) + } + + // Ignore all tests for now due to likely incompatibility. + override def ignoreList: Set[String] = super.ignoreList ++ Set( + "postgreSQL", Review Comment: please put the directories in a separate `Set[String]`, and sort the directories and .sql filenames alphabetically. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
