This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 145e169a2c8b [SPARK-46694][SQL][TESTS] Drop the assumptions of 'hive version < 2.0' in Hive version related tests 145e169a2c8b is described below commit 145e169a2c8bdf9798e7ec38be2a51e44ad90e88 Author: Kent Yao <y...@apache.org> AuthorDate: Fri Jan 12 15:28:00 2024 +0800 [SPARK-46694][SQL][TESTS] Drop the assumptions of 'hive version < 2.0' in Hive version related tests ### What changes were proposed in this pull request? Code clean-up following SPARK-45328, this PR drops the assumptions and workarounds in Hive version-related tests, which work for hive version < 2.0 ### Why are the changes needed? clean dead codes ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? SPARK_TEST_HIVE_CLIENT_VERSIONS=3.1 ./build/sbt -Phive -Phive-thriftserver "testOnly *HiveClientSuite*" ### Was this patch authored or co-authored using generative AI tooling? no Closes #44700 from yaooqinn/SPARK-46694. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> --- .../spark/sql/hive/client/HiveClientSuite.scala | 255 +++++---------------- .../spark/sql/hive/client/HiveClientSuites.scala | 2 +- .../hive/client/HivePartitionFilteringSuite.scala | 5 +- 3 files changed, 60 insertions(+), 202 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index aceca829df8b..0bc288501a01 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPermanentFunctionException, PartitionsAlreadyExistException} +import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} import org.apache.spark.sql.hive.HiveExternalCatalog @@ -38,8 +38,7 @@ import org.apache.spark.sql.hive.test.TestHiveVersion import org.apache.spark.sql.types.{IntegerType, StructType} import org.apache.spark.util.{MutableURLClassLoader, Utils} -class HiveClientSuite(version: String, allVersions: Seq[String]) - extends HiveVersionSuite(version) { +class HiveClientSuite(version: String) extends HiveVersionSuite(version) { private var versionSpark: TestHiveVersion = null @@ -103,31 +102,29 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) } test("create/get/alter database should pick right user name as owner") { - if (version != "0.12") { - val currentUser = UserGroupInformation.getCurrentUser.getUserName - val ownerName = "SPARK_29425" - val db1 = "SPARK_29425_1" - val db2 = "SPARK_29425_2" - val ownerProps = Map("owner" -> ownerName) - - // create database with owner - val dbWithOwner = CatalogDatabase(db1, "desc", Utils.createTempDir().toURI, ownerProps) - client.createDatabase(dbWithOwner, ignoreIfExists = true) - val getDbWithOwner = client.getDatabase(db1) - assert(getDbWithOwner.properties("owner") === ownerName) - // alter database without owner - client.alterDatabase(getDbWithOwner.copy(properties = Map())) - assert(client.getDatabase(db1).properties("owner") === "") - - // create database without owner - val dbWithoutOwner = CatalogDatabase(db2, "desc", Utils.createTempDir().toURI, Map()) - client.createDatabase(dbWithoutOwner, ignoreIfExists = true) - val getDbWithoutOwner = client.getDatabase(db2) - assert(getDbWithoutOwner.properties("owner") === currentUser) - // alter database with owner - client.alterDatabase(getDbWithoutOwner.copy(properties = ownerProps)) - assert(client.getDatabase(db2).properties("owner") === ownerName) - } + val currentUser = UserGroupInformation.getCurrentUser.getUserName + val ownerName = "SPARK_29425" + val db1 = "SPARK_29425_1" + val db2 = "SPARK_29425_2" + val ownerProps = Map("owner" -> ownerName) + + // create database with owner + val dbWithOwner = CatalogDatabase(db1, "desc", Utils.createTempDir().toURI, ownerProps) + client.createDatabase(dbWithOwner, ignoreIfExists = true) + val getDbWithOwner = client.getDatabase(db1) + assert(getDbWithOwner.properties("owner") === ownerName) + // alter database without owner + client.alterDatabase(getDbWithOwner.copy(properties = Map())) + assert(client.getDatabase(db1).properties("owner") === "") + + // create database without owner + val dbWithoutOwner = CatalogDatabase(db2, "desc", Utils.createTempDir().toURI, Map()) + client.createDatabase(dbWithoutOwner, ignoreIfExists = true) + val getDbWithoutOwner = client.getDatabase(db2) + assert(getDbWithoutOwner.properties("owner") === currentUser) + // alter database with owner + client.alterDatabase(getDbWithoutOwner.copy(properties = ownerProps)) + assert(client.getDatabase(db2).properties("owner") === ownerName) } test("createDatabase with null description") { @@ -336,30 +333,10 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) } test("dropTable") { - val versionsWithoutPurge = - if (allVersions.contains("0.14")) allVersions.takeWhile(_ != "0.14") else Nil - // First try with the purge option set. This should fail if the version is < 0.14, in which - // case we check the version and try without it. - try { - client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false, - purge = true) - assert(!versionsWithoutPurge.contains(version)) - } catch { - case _: UnsupportedOperationException => - assert(versionsWithoutPurge.contains(version)) - client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false, - purge = false) - } - // Drop table with type CatalogTableType.VIEW. - try { - client.dropTable("default", tableName = "view1", ignoreIfNotExists = false, - purge = true) - assert(!versionsWithoutPurge.contains(version)) - } catch { - case _: UnsupportedOperationException => - client.dropTable("default", tableName = "view1", ignoreIfNotExists = false, - purge = false) - } + client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false, + purge = true) + client.dropTable("default", tableName = "view1", ignoreIfNotExists = false, + purge = true) assert(client.listTables("default") === Seq("src")) } @@ -416,13 +393,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) // Only one partition [1, 1] for key2 == 1 val result = client.getPartitionsByFilter(client.getRawHiveTable("default", "src_part"), Seq(EqualTo(AttributeReference("key2", IntegerType)(), Literal(1)))) - - // Hive 0.12 doesn't support getPartitionsByFilter, it ignores the filter condition. - if (version != "0.12") { - assert(result.size == 1) - } else { - assert(result.size == testPartitionCount) - } + assert(result.size == 1) } test("getPartition") { @@ -488,10 +459,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) val spec = Map("key1" -> "1", "key2" -> "2") val parameters = Map(StatsSetupConst.TOTAL_SIZE -> "0", StatsSetupConst.NUM_FILES -> "1") val newLocation = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/")) - val storage = storageFormat.copy( - locationUri = Some(newLocation), - // needed for 0.12 alter partitions - serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + val storage = storageFormat.copy(locationUri = Some(newLocation)) val partition = CatalogTablePartition(spec, storage, parameters) client.alterPartitions("default", "src_part", Seq(partition)) assert(client.getPartition("default", "src_part", spec) @@ -502,21 +470,8 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) test("dropPartitions") { val spec = Map("key1" -> "1", "key2" -> "3") - val versionsWithoutPurge = - if (allVersions.contains("1.2")) allVersions.takeWhile(_ != "1.2") else Nil - // Similar to dropTable; try with purge set, and if it fails, make sure we're running - // with a version that is older than the minimum (1.2 in this case). - try { - client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true, - purge = true, retainData = false) - assert(!versionsWithoutPurge.contains(version)) - } catch { - case _: UnsupportedOperationException => - assert(versionsWithoutPurge.contains(version)) - client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true, - purge = false, retainData = false) - } - + client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true, + purge = true, retainData = false) assert(client.getPartitionOption("default", "src_part", spec).isEmpty) } @@ -555,92 +510,42 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) test("createFunction") { val functionClass = "org.apache.spark.MyFunc1" - if (version == "0.12") { - // Hive 0.12 doesn't support creating permanent functions - intercept[AnalysisException] { - client.createFunction("default", function("func1", functionClass)) - } - } else { - client.createFunction("default", function("func1", functionClass)) - } + client.createFunction("default", function("func1", functionClass)) } test("functionExists") { - if (version == "0.12") { - // Hive 0.12 doesn't allow customized permanent functions - assert(!client.functionExists("default", "func1")) - } else { - assert(client.functionExists("default", "func1")) - } + assert(client.functionExists("default", "func1")) } test("renameFunction") { - if (version == "0.12") { - // Hive 0.12 doesn't allow customized permanent functions - intercept[NoSuchPermanentFunctionException] { - client.renameFunction("default", "func1", "func2") - } - } else { - client.renameFunction("default", "func1", "func2") - assert(client.functionExists("default", "func2")) - } + client.renameFunction("default", "func1", "func2") + assert(client.functionExists("default", "func2")) } test("alterFunction") { val functionClass = "org.apache.spark.MyFunc2" - if (version == "0.12") { - // Hive 0.12 doesn't allow customized permanent functions - intercept[NoSuchPermanentFunctionException] { - client.alterFunction("default", function("func2", functionClass)) - } - } else { - client.alterFunction("default", function("func2", functionClass)) - } + client.alterFunction("default", function("func2", functionClass)) } test("getFunction") { - if (version == "0.12") { - // Hive 0.12 doesn't allow customized permanent functions - intercept[NoSuchPermanentFunctionException] { - client.getFunction("default", "func2") - } - } else { - // No exception should be thrown - val func = client.getFunction("default", "func2") - assert(func.className == "org.apache.spark.MyFunc2") - } + // No exception should be thrown + val func = client.getFunction("default", "func2") + assert(func.className == "org.apache.spark.MyFunc2") } test("getFunctionOption") { - if (version == "0.12") { - // Hive 0.12 doesn't allow customized permanent functions - assert(client.getFunctionOption("default", "func2").isEmpty) - } else { - assert(client.getFunctionOption("default", "func2").isDefined) - assert(client.getFunctionOption("default", "the_func_not_exists").isEmpty) - } + assert(client.getFunctionOption("default", "func2").isDefined) + assert(client.getFunctionOption("default", "the_func_not_exists").isEmpty) } test("listFunctions") { - if (version == "0.12") { - // Hive 0.12 doesn't allow customized permanent functions - assert(client.listFunctions("default", "fun.*").isEmpty) - } else { - assert(client.listFunctions("default", "fun.*").size == 1) - } + assert(client.listFunctions("default", "fun.*").size == 1) } test("dropFunction") { - if (version == "0.12") { - // Hive 0.12 doesn't support creating permanent functions - intercept[NoSuchPermanentFunctionException] { - client.dropFunction("default", "func2") - } - } else { - // No exception should be thrown - client.dropFunction("default", "func2") - assert(client.listFunctions("default", "fun.*").isEmpty) - } + // No exception should be thrown + client.dropFunction("default", "func2") + assert(client.listFunctions("default", "fun.*").isEmpty) } /////////////////////////////////////////////////////////////////////////// @@ -740,12 +645,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) assert(versionSpark.table("tbl").collect().toSeq == Seq(Row(1))) val tableMeta = versionSpark.sessionState.catalog.getTableMetadata(TableIdentifier("tbl")) val totalSize = tableMeta.stats.map(_.sizeInBytes) - // Except 0.12, all the following versions will fill the Hive-generated statistics - if (version == "0.12") { - assert(totalSize.isEmpty) - } else { - assert(totalSize.nonEmpty && totalSize.get > 0) - } + assert(totalSize.nonEmpty && totalSize.get > 0) } } @@ -764,12 +664,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) TableIdentifier("tbl"), spec = Map("ds" -> "2")).parameters val totalSize = partMeta.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) val numFiles = partMeta.get(StatsSetupConst.NUM_FILES).map(_.toLong) - // Except 0.12, all the following versions will fill the Hive-generated statistics - if (version == "0.12") { - assert(totalSize.isEmpty && numFiles.isEmpty) - } else { - assert(totalSize.nonEmpty && numFiles.nonEmpty) - } + assert(totalSize.nonEmpty && numFiles.nonEmpty) versionSpark.sql( """ @@ -781,12 +676,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) val newTotalSize = newPartMeta.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) val newNumFiles = newPartMeta.get(StatsSetupConst.NUM_FILES).map(_.toLong) - // Except 0.12, all the following versions will fill the Hive-generated statistics - if (version == "0.12") { - assert(newTotalSize.isEmpty && newNumFiles.isEmpty) - } else { - assert(newTotalSize.nonEmpty && newNumFiles.nonEmpty) - } + assert(newTotalSize.nonEmpty && newNumFiles.nonEmpty) } } @@ -808,10 +698,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) val filePaths = dir.map(_.getName).toList folders.flatMap(listFiles) ++: filePaths } - // expect 2 files left: `.part-00000-random-uuid.crc` and `part-00000-random-uuid` - // 0.12, 0.13, 1.0 and 1.1 also has another two more files ._SUCCESS.crc and _SUCCESS - val metadataFiles = Seq("._SUCCESS.crc", "_SUCCESS") - assert(listFiles(tmpDir).filterNot(metadataFiles.contains).length == 2) + assert(listFiles(tmpDir).length === 2) } } } @@ -941,42 +828,16 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) """.stripMargin ) - val errorMsg = "Cannot safely cast 'f0': decimal(2,1) to binary" - if (isPartitioned) { val insertStmt = s"INSERT OVERWRITE TABLE $tableName partition (ds='a') SELECT 1.3" - if (version == "0.12" || version == "0.13") { - checkError( - exception = intercept[AnalysisException](versionSpark.sql(insertStmt)), - errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", - parameters = Map( - "tableName" -> "`spark_catalog`.`default`.`tab1`", - "colName" -> "`f0`", - "srcType" -> "\"DECIMAL(2,1)\"", - "targetType" -> "\"BINARY\"") - ) - } else { - versionSpark.sql(insertStmt) - assert(versionSpark.table(tableName).collect() === - versionSpark.sql("SELECT 1.30, 'a'").collect()) - } + versionSpark.sql(insertStmt) + assert(versionSpark.table(tableName).collect() === + versionSpark.sql("SELECT 1.30, 'a'").collect()) } else { val insertStmt = s"INSERT OVERWRITE TABLE $tableName SELECT 1.3" - if (version == "0.12" || version == "0.13") { - checkError( - exception = intercept[AnalysisException](versionSpark.sql(insertStmt)), - errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", - parameters = Map( - "tableName" -> "`spark_catalog`.`default`.`tab1`", - "colName" -> "`f0`", - "srcType" -> "\"DECIMAL(2,1)\"", - "targetType" -> "\"BINARY\"") - ) - } else { - versionSpark.sql(insertStmt) - assert(versionSpark.table(tableName).collect() === - versionSpark.sql("SELECT 1.30").collect()) - } + versionSpark.sql(insertStmt) + assert(versionSpark.table(tableName).collect() === + versionSpark.sql("SELECT 1.30").collect()) } } } @@ -1025,7 +886,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) test("SPARK-17920: Insert into/overwrite avro table") { // skipped because it's failed in the condition on Windows - assume(!(Utils.isWindows && version == "0.12")) + assume(!Utils.isWindows) withTempDir { dir => val avroSchema = """ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuites.scala index b172c0dfedc9..015078f269f9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuites.scala @@ -91,6 +91,6 @@ class HiveClientSuites extends SparkFunSuite with HiveClientVersions { } override def nestedSuites: IndexedSeq[Suite] = { - versions.map(new HiveClientSuite(_, versions)) + versions.map(new HiveClientSuite(_)) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala index 48a6c6a2be65..1a4eb7554789 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala @@ -49,13 +49,10 @@ class HivePartitionFilteringSuite(version: String) private val fallbackKey = SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ON_EXCEPTION.key private val pruningFastFallback = SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK.key - // Support default partition in metastoredirectsql since HIVE-11898(Hive 2.0.0). - private val defaultPartition = if (version >= "2.0") Some(DEFAULT_PARTITION_NAME) else None - private val dsValue = 20170101 to 20170103 private val hValue = 0 to 4 private val chunkValue = Seq("aa", "ab", "ba", "bb") - private val dateValue = Seq("2019-01-01", "2019-01-02", "2019-01-03") ++ defaultPartition + private val dateValue = Seq("2019-01-01", "2019-01-02", "2019-01-03", DEFAULT_PARTITION_NAME) private val dateStrValue = Seq("2020-01-01", "2020-01-02", "2020-01-03", "20200104", "20200105") private val timestampStrValue = Seq("2021-01-01 00:00:00", "2021-01-02 00:00:00") private val testPartitionCount = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org