dongjoon-hyun commented on a change in pull request #23694: [SPARK-24360][SQL]
Support Hive 3.1 metastore
URL: https://github.com/apache/spark/pull/23694#discussion_r252527908
##########
File path:
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
##########
@@ -1179,3 +1180,128 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
private[client] class Shim_v2_2 extends Shim_v2_1
private[client] class Shim_v2_3 extends Shim_v2_1
+
+private[client] class Shim_v3_1 extends Shim_v2_3 {
+ // Spark supports only non-ACID operations
+ protected lazy val isAcidIUDoperation = JBoolean.FALSE
+
+ // Writer ID can be 0 for non-ACID operations
+ protected lazy val writeIdInLoadTableOrPartition: JLong = 0L
+
+ // Statement ID
+ protected lazy val stmtIdInLoadTableOrPartition: JInteger = 0
+
+ protected lazy val listBucketingLevel: JInteger = 0
+
+ private lazy val clazzLoadFileType = getClass.getClassLoader.loadClass(
+ "org.apache.hadoop.hive.ql.plan.LoadTableDesc$LoadFileType")
+
+ private lazy val loadPartitionMethod =
+ findMethod(
+ classOf[Hive],
+ "loadPartition",
+ classOf[Path],
+ classOf[Table],
+ classOf[JMap[String, String]],
+ clazzLoadFileType,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ classOf[JLong],
+ JInteger.TYPE,
+ JBoolean.TYPE)
+ private lazy val loadTableMethod =
+ findMethod(
+ classOf[Hive],
+ "loadTable",
+ classOf[Path],
+ classOf[String],
+ clazzLoadFileType,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ classOf[JLong],
+ JInteger.TYPE,
+ JBoolean.TYPE)
+ private lazy val loadDynamicPartitionsMethod =
+ findMethod(
+ classOf[Hive],
+ "loadDynamicPartitions",
+ classOf[Path],
+ classOf[String],
+ classOf[JMap[String, String]],
+ clazzLoadFileType,
+ JInteger.TYPE,
+ JInteger.TYPE,
+ JBoolean.TYPE,
+ JLong.TYPE,
+ JInteger.TYPE,
+ JBoolean.TYPE,
+ classOf[AcidUtils.Operation],
+ JBoolean.TYPE)
+
+ override def loadPartition(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ partSpec: JMap[String, String],
+ replace: Boolean,
+ inheritTableSpecs: Boolean,
+ isSkewedStoreAsSubdir: Boolean,
+ isSrcLocal: Boolean): Unit = {
+ val session = SparkSession.getActiveSession
+ assert(session.nonEmpty)
+ val database = session.get.sessionState.catalog.getCurrentDatabase
+ val table = hive.getTable(database, tableName)
+ val loadFileType = if (replace) {
+
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
+ } else {
+
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
+ }
+ assert(loadFileType.isDefined)
+ loadPartitionMethod.invoke(hive, loadPath, table, partSpec,
loadFileType.get,
+ inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
+ isSrcLocal: JBoolean, isAcid, hasFollowingStatsTask,
+ writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition, replace:
JBoolean)
+ }
+
+ override def loadTable(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ replace: Boolean,
+ isSrcLocal: Boolean): Unit = {
+ val loadFileType = if (replace) {
+
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
+ } else {
+
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
+ }
+ assert(loadFileType.isDefined)
+ loadTableMethod.invoke(hive, loadPath, tableName, loadFileType.get,
isSrcLocal: JBoolean,
+ isSkewedStoreAsSubdir, isAcidIUDoperation, hasFollowingStatsTask,
+ writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition: JInteger,
replace: JBoolean)
+ }
+
+ override def loadDynamicPartitions(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ partSpec: JMap[String, String],
+ replace: Boolean,
+ numDP: Int,
+ listBucketingEnabled: Boolean): Unit = {
+ val loadFileType = if (replace) {
+
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
+ } else {
+
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
+ }
+ assert(loadFileType.isDefined)
+ loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec,
loadFileType.get,
+ numDP: JInteger, listBucketingLevel, isAcid,
writeIdInLoadTableOrPartition,
+ stmtIdInLoadTableOrPartition, hasFollowingStatsTask,
AcidUtils.Operation.NOT_ACID,
+ replace: JBoolean)
+ }
Review comment:
It's possible, but it doesn't help much. As we see
[here](https://github.com/apache/spark/pull/23694/files#diff-6fd847124f8eae45ba2de1cf7d6296feL855),
`Hive.getIndexes` raises `NoSuchMethodError` before we call `shim.dropIndex`.
- [Hive.java in
2.3](https://github.com/apache/hive/blob/branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L3703-L3712)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]