sandeep-katta opened a new pull request #29649:
URL: https://github.com/apache/spark/pull/29649


   ### What changes were proposed in this pull request?
   
   No need of using database name in `loadPartition` API of Shim_v3_0 to get 
the hive table, already in hive there is a overloaded method which gives hive 
table using table name. Using this approach dependency on `SessionCatalog` can 
be in  removed in Shim layer
   
   ### Why are the changes needed?
   To avoid deadlock when communicating with Hive metastore 3.1.x
   ```
   Found one Java-level deadlock:
   =============================
   "worker3":
     waiting to lock monitor 0x00007faf0be602b8 (object 0x00000007858f85f0, a 
org.apache.spark.sql.hive.HiveSessionCatalog),
     which is held by "worker0"
   "worker0":
     waiting to lock monitor 0x00007faf0be5fc88 (object 0x0000000785c15c80, a 
org.apache.spark.sql.hive.HiveExternalCatalog),
     which is held by "worker3"
   
   Java stack information for the threads listed above:
   ===================================================
   "worker3":
     at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCurrentDatabase(SessionCatalog.scala:256)
     - waiting to lock <0x00000007858f85f0> (a 
org.apache.spark.sql.hive.HiveSessionCatalog)
     at 
org.apache.spark.sql.hive.client.Shim_v3_0.loadPartition(HiveShim.scala:1332)
     at 
org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$loadPartition$1(HiveClientImpl.scala:870)
     at 
org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$4459/1387095575.apply$mcV$sp(Unknown
 Source)
     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
     at 
org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
     at 
org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$2227/313239499.apply(Unknown
 Source)
     at 
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
     at 
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
     - locked <0x0000000785ef9d78> (a 
org.apache.spark.sql.hive.client.IsolatedClientLoader)
     at 
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:276)
     at 
org.apache.spark.sql.hive.client.HiveClientImpl.loadPartition(HiveClientImpl.scala:860)
     at 
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$loadPartition$1(HiveExternalCatalog.scala:911)
     at 
org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$4457/2037578495.apply$mcV$sp(Unknown
 Source)
     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
     at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
     - locked <0x0000000785c15c80> (a 
org.apache.spark.sql.hive.HiveExternalCatalog)
     at 
org.apache.spark.sql.hive.HiveExternalCatalog.loadPartition(HiveExternalCatalog.scala:890)
     at 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.loadPartition(ExternalCatalogWithListener.scala:179)
     at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadPartition(SessionCatalog.scala:512)
     at 
org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:383)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
     - locked <0x00000007b1690ff8> (a 
org.apache.spark.sql.execution.command.ExecutedCommandExec)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
     at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
     at org.apache.spark.sql.Dataset$$Lambda$2084/428667685.apply(Unknown 
Source)
     at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
     at org.apache.spark.sql.Dataset$$Lambda$2085/559530590.apply(Unknown 
Source)
     at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
     at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2093/139449177.apply(Unknown
 Source)
     at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
     at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
     at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2086/1088974677.apply(Unknown
 Source)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
     at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
     at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
     at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
     at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
     at org.apache.spark.sql.Dataset$$$Lambda$1959/1977822284.apply(Unknown 
Source)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
     at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
     at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
     at org.apache.spark.sql.SparkSession$$Lambda$1899/424830920.apply(Unknown 
Source)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
     at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
     at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:45)
     at java.lang.Thread.run(Thread.java:748)
   "worker0":
     at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
     - waiting to lock <0x0000000785c15c80
     > (a org.apache.spark.sql.hive.HiveExternalCatalog)
     at 
org.apache.spark.sql.hive.HiveExternalCatalog.tableExists(HiveExternalCatalog.scala:851)
     at 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.tableExists(ExternalCatalogWithListener.scala:146)
     at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:432)
     - locked <0x00000007858f85f0> (a 
org.apache.spark.sql.hive.HiveSessionCatalog)
     at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireTableExists(SessionCatalog.scala:185)
     at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadPartition(SessionCatalog.scala:509)
     at 
org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:383)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
     - locked <0x00000007b529af58> (a 
org.apache.spark.sql.execution.command.ExecutedCommandExec)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
     at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
     at org.apache.spark.sql.Dataset$$Lambda$2084/428667685.apply(Unknown 
Source)
     at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
     at org.apache.spark.sql.Dataset$$Lambda$2085/559530590.apply(Unknown 
Source)
     at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
     at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2093/139449177.apply(Unknown
 Source)
     at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
     at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
     at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2086/1088974677.apply(Unknown
 Source)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
     at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
     at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
     at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
     at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
     at org.apache.spark.sql.Dataset$$$Lambda$1959/1977822284.apply(Unknown 
Source)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
     at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
     at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
     at org.apache.spark.sql.SparkSession$$Lambda$1899/424830920.apply(Unknown 
Source)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
     at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
     at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:45)
     at java.lang.Thread.run(Thread.java:748)
   
   Found 1 deadlock. 
   ```
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Tested using below script by executing in spark-shell and I found no dead 
lock
   
   launch spark-shell using ./bin/spark-shell --conf 
"spark.sql.hive.metastore.jars=maven" --conf 
spark.sql.hive.metastore.version=3.1 --conf 
spark.hadoop.datanucleus.schema.autoCreateAll=true
   
   **code**
   ```
   def testHiveDeadLock = {
         import scala.collection.mutable.ArrayBuffer
         import scala.util.Random
         println("test hive DeadLock")
         spark.sql("drop database if exists testDeadLock cascade")
         spark.sql("create database testDeadLock")
         spark.sql("use testDeadLock")
         val tableCount = 100
         val tableNamePrefix = "testdeadlock"
         for (i <- 0 until tableCount) {
           val tableName = s"$tableNamePrefix${i + 1}"
           spark.sql(s"drop table if exists $tableName")
           spark.sql(s"create table $tableName (a bigint) partitioned by (b 
bigint) stored as orc")
         }
   
         val threads = new ArrayBuffer[Thread]
         for (i <- 0 until tableCount) {
           threads.append(new Thread( new Runnable {
             override def run: Unit = {
               val tableName = s"$tableNamePrefix${i + 1}"
               val rand = Random
               val df = spark.range(0, 20000).toDF("a")
               val location = s"/tmp/${rand.nextLong.abs}"
               df.write.mode("overwrite").orc(location)
               spark.sql(
                 s"""
           LOAD DATA LOCAL INPATH '$location' INTO TABLE $tableName partition 
(b=$i)""")
             }
           }, s"worker$i"))
           threads(i).start()
         }
   
         for (i <- 0 until tableCount) {
           println(s"Joining with thread $i")
           threads(i).join()
         }
         for (i <- 0 until tableCount) {
           val tableName = s"$tableNamePrefix${i + 1}"
           spark.sql(s"select count(*) from $tableName").show(false)
         }
         println("All done")
       }
   
       for(i <- 0 until 100) {
         testHiveDeadLock
         println(s"completed {$i}th iteration")
       }
     }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to