sandeep-katta opened a new pull request #29649:
URL: https://github.com/apache/spark/pull/29649
### What changes were proposed in this pull request?
No need of using database name in `loadPartition` API of Shim_v3_0 to get
the hive table, already in hive there is a overloaded method which gives hive
table using table name. Using this approach dependency on `SessionCatalog` can
be in removed in Shim layer
### Why are the changes needed?
To avoid deadlock when communicating with Hive metastore 3.1.x
```
Found one Java-level deadlock:
=============================
"worker3":
waiting to lock monitor 0x00007faf0be602b8 (object 0x00000007858f85f0, a
org.apache.spark.sql.hive.HiveSessionCatalog),
which is held by "worker0"
"worker0":
waiting to lock monitor 0x00007faf0be5fc88 (object 0x0000000785c15c80, a
org.apache.spark.sql.hive.HiveExternalCatalog),
which is held by "worker3"
Java stack information for the threads listed above:
===================================================
"worker3":
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCurrentDatabase(SessionCatalog.scala:256)
- waiting to lock <0x00000007858f85f0> (a
org.apache.spark.sql.hive.HiveSessionCatalog)
at
org.apache.spark.sql.hive.client.Shim_v3_0.loadPartition(HiveShim.scala:1332)
at
org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$loadPartition$1(HiveClientImpl.scala:870)
at
org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$4459/1387095575.apply$mcV$sp(Unknown
Source)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
at
org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$2227/313239499.apply(Unknown
Source)
at
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
at
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
- locked <0x0000000785ef9d78> (a
org.apache.spark.sql.hive.client.IsolatedClientLoader)
at
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:276)
at
org.apache.spark.sql.hive.client.HiveClientImpl.loadPartition(HiveClientImpl.scala:860)
at
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$loadPartition$1(HiveExternalCatalog.scala:911)
at
org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$4457/2037578495.apply$mcV$sp(Unknown
Source)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
- locked <0x0000000785c15c80> (a
org.apache.spark.sql.hive.HiveExternalCatalog)
at
org.apache.spark.sql.hive.HiveExternalCatalog.loadPartition(HiveExternalCatalog.scala:890)
at
org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.loadPartition(ExternalCatalogWithListener.scala:179)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadPartition(SessionCatalog.scala:512)
at
org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:383)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
- locked <0x00000007b1690ff8> (a
org.apache.spark.sql.execution.command.ExecutedCommandExec)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at org.apache.spark.sql.Dataset$$Lambda$2084/428667685.apply(Unknown
Source)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
at org.apache.spark.sql.Dataset$$Lambda$2085/559530590.apply(Unknown
Source)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2093/139449177.apply(Unknown
Source)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2086/1088974677.apply(Unknown
Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.Dataset$$$Lambda$1959/1977822284.apply(Unknown
Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
at org.apache.spark.sql.SparkSession$$Lambda$1899/424830920.apply(Unknown
Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:45)
at java.lang.Thread.run(Thread.java:748)
"worker0":
at
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
- waiting to lock <0x0000000785c15c80
> (a org.apache.spark.sql.hive.HiveExternalCatalog)
at
org.apache.spark.sql.hive.HiveExternalCatalog.tableExists(HiveExternalCatalog.scala:851)
at
org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.tableExists(ExternalCatalogWithListener.scala:146)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:432)
- locked <0x00000007858f85f0> (a
org.apache.spark.sql.hive.HiveSessionCatalog)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireTableExists(SessionCatalog.scala:185)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadPartition(SessionCatalog.scala:509)
at
org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:383)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
- locked <0x00000007b529af58> (a
org.apache.spark.sql.execution.command.ExecutedCommandExec)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at org.apache.spark.sql.Dataset$$Lambda$2084/428667685.apply(Unknown
Source)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
at org.apache.spark.sql.Dataset$$Lambda$2085/559530590.apply(Unknown
Source)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2093/139449177.apply(Unknown
Source)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2086/1088974677.apply(Unknown
Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.Dataset$$$Lambda$1959/1977822284.apply(Unknown
Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
at org.apache.spark.sql.SparkSession$$Lambda$1899/424830920.apply(Unknown
Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:45)
at java.lang.Thread.run(Thread.java:748)
Found 1 deadlock.
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tested using below script by executing in spark-shell and I found no dead
lock
launch spark-shell using ./bin/spark-shell --conf
"spark.sql.hive.metastore.jars=maven" --conf
spark.sql.hive.metastore.version=3.1 --conf
spark.hadoop.datanucleus.schema.autoCreateAll=true
**code**
```
def testHiveDeadLock = {
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
println("test hive DeadLock")
spark.sql("drop database if exists testDeadLock cascade")
spark.sql("create database testDeadLock")
spark.sql("use testDeadLock")
val tableCount = 100
val tableNamePrefix = "testdeadlock"
for (i <- 0 until tableCount) {
val tableName = s"$tableNamePrefix${i + 1}"
spark.sql(s"drop table if exists $tableName")
spark.sql(s"create table $tableName (a bigint) partitioned by (b
bigint) stored as orc")
}
val threads = new ArrayBuffer[Thread]
for (i <- 0 until tableCount) {
threads.append(new Thread( new Runnable {
override def run: Unit = {
val tableName = s"$tableNamePrefix${i + 1}"
val rand = Random
val df = spark.range(0, 20000).toDF("a")
val location = s"/tmp/${rand.nextLong.abs}"
df.write.mode("overwrite").orc(location)
spark.sql(
s"""
LOAD DATA LOCAL INPATH '$location' INTO TABLE $tableName partition
(b=$i)""")
}
}, s"worker$i"))
threads(i).start()
}
for (i <- 0 until tableCount) {
println(s"Joining with thread $i")
threads(i).join()
}
for (i <- 0 until tableCount) {
val tableName = s"$tableNamePrefix${i + 1}"
spark.sql(s"select count(*) from $tableName").show(false)
}
println("All done")
}
for(i <- 0 until 100) {
testHiveDeadLock
println(s"completed {$i}th iteration")
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]