HeartSaVioR opened a new pull request #26316: [SPARK-29604][SQL][2.4] Force 
initialize SessionState before initializing HiveClient in SparkSQLEnv
URL: https://github.com/apache/spark/pull/26316
 
 
   ### What changes were proposed in this pull request?
   
   This patch fixes the issue that external listeners are not initialized 
properly when `spark.sql.hive.metastore.jars` is set to either "maven" or 
custom list of jar.
   ("builtin" is not a case here - all jars in Spark classloader are also 
available in separate classloader)
   
   The culprit is lazy initialization (lazy val or passing builder function) & 
thread context classloader. HiveClient leverages IsolatedClientLoader to 
properly load Hive and relevant libraries without issue - to not mess up with 
Spark classpath it uses separate classloader with leveraging thread context 
classloader.
   
   But there's a messed-up case - SessionState is being initialized while 
HiveClient changed the thread context classloader from Spark classloader to 
Hive isolated one, and streaming query listeners are loaded from changed 
classloader while initializing SessionState.
   
   This patch forces initializing SessionState in SparkSQLEnv to avoid such 
case.
   
   ### Why are the changes needed?
   
   ClassNotFoundException could occur in spark-sql with specific configuration, 
as explained above.
   
   ### Does this PR introduce any user-facing change?
   
   No, as I don't think end users assume the classloader of external listeners 
is only containing jars for Hive client.
   
   ### How was this patch tested?
   
   New UT added which fails on master branch and passes with the patch.
   
   The error message with master branch when running UT:
   
   ```
   java.lang.IllegalArgumentException: Error while instantiating 
'org.apache.spark.sql.hive.HiveSessionStateBuilder':;
   org.apache.spark.sql.AnalysisException: java.lang.IllegalArgumentException: 
Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':;
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:109)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:221)
        at 
org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:147)
        at 
org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:137)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:59)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite.$anonfun$new$2(SparkSQLEnvSuite.scala:44)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite.withSystemProperties(SparkSQLEnvSuite.scala:61)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite.$anonfun$new$1(SparkSQLEnvSuite.scala:43)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
        at org.scalatest.Transformer.apply(Transformer.scala:20)
        at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
        at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:149)
        at 
org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
        at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
        at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
        at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
        at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
        at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:56)
        at 
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
        at 
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
        at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:56)
        at 
org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
        at 
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:393)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
        at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:376)
        at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
        at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
        at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
        at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
        at org.scalatest.Suite.run(Suite.scala:1124)
        at org.scalatest.Suite.run$(Suite.scala:1106)
        at 
org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
        at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
        at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
        at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
        at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
        at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:56)
        at 
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
        at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
        at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
        at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:56)
        at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
        at 
org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1349)
        at 
org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1343)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1343)
        at 
org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1033)
        at 
org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1011)
        at 
org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1509)
        at 
org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1011)
        at org.scalatest.tools.Runner$.run(Runner.scala:850)
        at org.scalatest.tools.Runner.run(Runner.scala)
        at 
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
        at 
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
   Caused by: java.lang.IllegalArgumentException: Error while instantiating 
'org.apache.spark.sql.hive.HiveSessionStateBuilder':
        at 
org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1054)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:156)
        at scala.Option.getOrElse(Option.scala:189)
        at 
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:154)
        at 
org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:151)
        at 
org.apache.spark.sql.SparkSession.$anonfun$new$3(SparkSession.scala:105)
        at scala.Option.map(Option.scala:230)
        at 
org.apache.spark.sql.SparkSession.$anonfun$new$1(SparkSession.scala:105)
        at org.apache.spark.sql.internal.SQLConf$.get(SQLConf.scala:164)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:183)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:127)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at 
org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:300)
        at 
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:421)
        at 
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:314)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:68)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:67)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:221)
        at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
        ... 58 more
   Caused by: java.lang.ClassNotFoundException: 
test.custom.listener.DummyQueryExecutionListener
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.spark.util.Utils$.classForName(Utils.scala:206)
        at 
org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:2746)
        at 
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
        at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
        at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:2744)
        at 
org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$1(QueryExecutionListener.scala:83)
        at 
org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$1$adapted(QueryExecutionListener.scala:82)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.sql.util.ExecutionListenerManager.<init>(QueryExecutionListener.scala:82)
        at 
org.apache.spark.sql.internal.BaseSessionStateBuilder.$anonfun$listenerManager$2(BaseSessionStateBuilder.scala:293)
        at scala.Option.getOrElse(Option.scala:189)
        at 
org.apache.spark.sql.internal.BaseSessionStateBuilder.listenerManager(BaseSessionStateBuilder.scala:293)
        at 
org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:320)
        at 
org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1051)
        ... 80 more
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to