dtenedor commented on code in PR #44084: URL: https://github.com/apache/spark/pull/44084#discussion_r1434597887
########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/SQLQueryTestRunner.scala: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +/** + * Trait for classes that can run SQL queries for testing. This is specifically used as a wrapper + * around a connection to a system that can run SQL queries, to run queries from + * [[SQLQueryTestSuite]]. + */ +trait SQLQueryTestRunner { Review Comment: we already have the existing "trait SQLQueryTestHelper", can we use that instead? ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/CrossDbmsQueryTestSuite.scala: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.io.File +import java.util.Locale + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile} +import org.apache.spark.util.ArrayImplicits.SparkArrayOps + +// scalastyle:off line.size.limit +/** + * IF YOU ADDED A NEW SQL TEST AND THIS SUITE IS FAILING, READ THIS: + * Your new SQL test is automatically opted into this suite. It is likely failing because it is not + * compatible with the default DBMS (currently postgres). You have two options: + * 1. (Recommended) Modify your queries to be compatible with both systems, and generate golden + * files with the instructions below. This is recommended because it will run your queries + * against postgres, providing higher correctness testing confidence, and you won't have to + * manually verify the golden files generated with your test. + * 2. Add this line to your .sql file: --ONLY_IF spark + * + * To re-generate golden files for entire suite, either run: + * 1. (Recommended) You need Docker on your machine. Install Docker and run the following command: + * {{{ + * bash ./bin/generate_golden_files_with_postgres.sh + * }}} + * 2. + * a. You need to have a Postgres server up before running this test. + * i. Install PostgreSQL. On a mac: `brew install postgresql@13` + * ii. After installing PostgreSQL, start the database server, then create a role named pg with + * superuser permissions: `createuser -s postgres` OR `psql> CREATE role postgres superuser` + * b. Run the following command: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.PostgreSQLQueryTestSuite" + * }}} + * + * To indicate that the SQL file is not eligible for testing with this suite, add the following + * comment into the input file: + * {{{ + * --ONLY_IF spark + * }}} + * + * And then, to run the entire test suite, with the default cross DBMS: + * {{{ + * build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.PostgreSQLQueryTestSuite" + * }}} + * + * To re-generate golden file for a single test, e.g. `describe.sql`, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.PostgreSQLQueryTestSuite -- -z describe.sql" + * }}} + */ + +class PostgreSQLQueryTestSuite extends CrossDbmsQueryTestSuite { + + protected def crossDbmsToGenerateGoldenFiles: String = CrossDbmsQueryTestSuite.POSTGRES + + // Reduce scope to subquery tests for now. That is where most correctness issues are. + override protected def customInputFilePath: String = new File(inputFilePath, "subquery").getAbsolutePath + + override protected def getConnection: Option[String] => JdbcSQLQueryTestRunner = + (connection_url: Option[String]) => JdbcSQLQueryTestRunner(PostgresConnection(connection_url)) + + override protected def preprocessingCommands = Seq( + // Custom function `double` to imitate Spark's function, so that more tests are covered. + """ + |CREATE OR REPLACE FUNCTION double(numeric_value numeric) RETURNS double precision + | AS 'select CAST($1 AS double precision);' + | LANGUAGE SQL + | IMMUTABLE + | RETURNS NULL ON NULL INPUT; + |""".stripMargin + ) +} + +/** + * See SQLQueryTestSuite.scala for more information. This suite builds off of that to allow us + * to generate golden files with other DBMS to perform cross-checking for correctness. It generates + * another set of golden files. Note that this is not currently run on all SQL input files by + * default because there is incompatibility between SQL dialects for Spark and the other DBMS. + * + * This suite adds a new comment argument, --ONLY_IF. This comment is used to indicate the DBMS for + * which is eligible for the SQL file. These strings are defined in the companion object. For + * example, if you have a SQL file named `describe.sql`, and you want to indicate that Postgres is + * incompatible, add the following comment into the input file: + * --ONLY_IF spark Review Comment: This could be simpler by just inferring whether to test against e.g. Postgres or not based on the presence of the corresponding golden file in its associated directory instead. ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/JdbcConnection.scala: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.sql.{DriverManager, ResultSet} +import java.util.Properties + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry + +private[sql] trait JdbcConnection { + /** + * Runs the given query. + * @return A Seq[String] representing the output. + */ + def runQuery(query: String): Seq[String] + + /** + * Drop the table with the given table name. + */ + def dropTable(tableName: String): Unit + + /** + * Create a table with the given table name and schema. + */ + def createTable(tableName: String, schemaString: String): Unit + + /** + * Load data from the given Spark Dataframe into the table with given name. + */ + def loadData(df: DataFrame, tableName: String): Unit + + /** + * Close the connection. + */ + def close(): Unit +} + +private[sql] case class PostgresConnection(connection_url: Option[String] = None) + extends JdbcConnection { + + DriverRegistry.register("org.postgresql.Driver") + private final val DEFAULT_USER = "pg" + private final val DEFAULT_CONNECTION_URL = + s"jdbc:postgresql://localhost:5432/postgres?user=$DEFAULT_USER" + private val url = connection_url.getOrElse(DEFAULT_CONNECTION_URL) + private val conn = DriverManager.getConnection(url) + private val stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) + + def runQuery(query: String): Seq[String] = { + try { + val isResultSet = stmt.execute(query) + val rows = ArrayBuffer[Row]() + if (isResultSet) { + val rs = stmt.getResultSet + val metadata = rs.getMetaData + while (rs.next()) { + val row = Row.fromSeq((1 to metadata.getColumnCount).map(i => rs.getObject(i))) + rows.append(row) + } + } + rows.map(_.mkString("\t")).toSeq Review Comment: OK, no problem ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/CrossDbmsQueryTestSuite.scala: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.io.File +import java.util.Locale + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLQueryTestSuite +import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile} +import org.apache.spark.util.ArrayImplicits.SparkArrayOps + +// scalastyle:off line.size.limit +/** + * IF YOU ADDED A NEW SQL TEST AND THIS SUITE IS FAILING, READ THIS: + * Your new SQL test is automatically opted into this suite. It is likely failing because it is not + * compatible with the default DBMS (currently postgres). You have two options: + * 1. (Recommended) Modify your queries to be compatible with both systems, and generate golden + * files with the instructions below. This is recommended because it will run your queries + * against postgres, providing higher correctness testing confidence, and you won't have to + * manually verify the golden files generated with your test. + * 2. Add this line to your .sql file: --ONLY_IF spark + * + * To re-generate golden files for entire suite, either run: + * 1. (Recommended) You need Docker on your machine. Install Docker and run the following command: + * {{{ + * bash ./bin/generate_golden_files_with_postgres.sh + * }}} + * 2. + * a. You need to have a Postgres server up before running this test. + * i. Install PostgreSQL. On a mac: `brew install postgresql@13` + * ii. After installing PostgreSQL, start the database server, then create a role named pg with + * superuser permissions: `createuser -s postgres` OR `psql> CREATE role postgres superuser` + * b. Run the following command: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.PostgreSQLQueryTestSuite" + * }}} + * + * To indicate that the SQL file is not eligible for testing with this suite, add the following + * comment into the input file: + * {{{ + * --ONLY_IF spark + * }}} + * + * And then, to run the entire test suite, with the default cross DBMS: + * {{{ + * build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.PostgreSQLQueryTestSuite" + * }}} + * + * To re-generate golden file for a single test, e.g. `describe.sql`, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.crossdbms.PostgreSQLQueryTestSuite -- -z describe.sql" + * }}} + */ + +class PostgreSQLQueryTestSuite extends CrossDbmsQueryTestSuite { Review Comment: can you put this in a separate file, please? ########## sql/core/src/test/scala/org/apache/spark/sql/crossdbms/JdbcConnection.scala: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.crossdbms + +import java.sql.{DriverManager, ResultSet} +import java.util.Properties + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry + +/** + * Represents a connection (session) to a database using JDBC. + */ +private[crossdbms] trait JdbcConnection { + + /** + * Executes the given SQL query and returns the result as a sequence of strings. + * @return A Seq[String] representing the output, where each element represents a single row. + */ + def runQuery(query: String): Seq[String] + + /** + * Drops the table with the specified table name. + */ + def dropTable(tableName: String): Unit + + /** + * Creates a table with the specified name and schema. + * @param schemaString The schema definition for the table. Note that this may vary depending on + * the database system. + */ + def createTable(tableName: String, schemaString: String): Unit + + /** + * Loads data from the given Spark DataFrame into the table with the specified name. + * @param df The Spark DataFrame containing the data to be loaded. Review Comment: please fix indentation here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
