This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e34427d4f3 [SPARK-42539][SQL][HIVE] Eliminate separate classloader 
when using 'builtin' Hive version for metadata client
2e34427d4f3 is described below

commit 2e34427d4f3f537484b9396cb491025aa21ef46b
Author: Erik Krogen <xkro...@apache.org>
AuthorDate: Wed Mar 1 13:45:31 2023 -0800

    [SPARK-42539][SQL][HIVE] Eliminate separate classloader when using 
'builtin' Hive version for metadata client
    
    ### What changes were proposed in this pull request?
    When using the 'builtin' Hive version for the Hive metadata client, do not 
create a separate classloader, and rather continue to use the overall 
user/application classloader (regardless of Java version). This standardizes 
the behavior for all Java versions with that of Java 9+. See SPARK-42539 for 
more details on why this approach was chosen.
    
    Please note that this is a re-submit of #40144. That one introduced test 
failures, and potentially a real issue, because the PR works by setting 
`isolationOn = false` for `builtin` mode. In addition to adjusting the 
classloader, `HiveClientImpl` relies on `isolationOn` to determine if it should 
use an isolated copy of `SessionState`, so the PR inadvertently switched to 
using a shared `SessionState` object. I think we do want to continue to have 
the isolated session state even in `buil [...]
    
    ### Why are the changes needed?
    Please see a much more detailed description in SPARK-42539. The tl;dr is 
that user-provided JARs (such as `hive-exec-2.3.8.jar`) take precedence over 
Spark/system JARs when constructing the classloader used by 
`IsolatedClientLoader` on Java 8 in 'builtin' mode, which can cause unexpected 
behavior and/or breakages. This violates the expectation that, unless 
user-first classloader mode is used, Spark JARs should be prioritized over user 
JARs. It also seems that this separate classloader [...]
    > attempt to discover the jars that were used to load Spark SQL and use 
those. This option is only valid when using the execution version of Hive.
    
    I can't follow the logic here; the user classloader clearly has all of the 
necessary Hive JARs, since that's where we're getting the JAR URLs from, so we 
could just use that directly instead of grabbing the URLs. When this was 
initially added, it only used the JARs from the user classloader, not any of 
its parents, which I suspect was the motivating factor (to try to avoid more 
Spark classes being duplicated inside of the isolated classloader, I guess). 
But that was changed a month la [...]
    
    ### Does this PR introduce _any_ user-facing change?
    No, except to protect Spark itself from potentially being broken by bad 
user JARs.
    
    ### How was this patch tested?
    This includes a new unit test in `HiveUtilsSuite` which demonstrates the 
issue and shows that this approach resolves it. It has also been tested on a 
live cluster running Java 8 and Hive communication functionality continues to 
work as expected.
    
    Unit tests failing in #40144 have been locally tested (`HiveUtilsSuite`, 
`HiveSharedStateSuite`, `HiveCliSessionStateSuite`, 
`JsonHadoopFsRelationSuite`).
    
    Closes #40224 from 
xkrogen/xkrogen/SPARK-42539/hive-isolatedclientloader-builtin-user-jar-conflict-fix/take2.
    
    Authored-by: Erik Krogen <xkro...@apache.org>
    Signed-off-by: Chao Sun <sunc...@apple.com>
---
 .../main/scala/org/apache/spark/TestUtils.scala    |  5 +-
 .../org/apache/spark/sql/hive/HiveUtils.scala      | 37 +--------
 .../spark/sql/hive/client/HiveClientImpl.scala     |  2 +-
 .../sql/hive/client/IsolatedClientLoader.scala     | 94 ++++++++++++----------
 .../org/apache/spark/sql/hive/HiveUtilsSuite.scala | 34 +++++++-
 5 files changed, 91 insertions(+), 81 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index bdf81d22efa..13ae6aca38b 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -193,12 +193,15 @@ private[spark] object TestUtils {
       baseClass: String = null,
       classpathUrls: Seq[URL] = Seq.empty,
       implementsClasses: Seq[String] = Seq.empty,
-      extraCodeBody: String = ""): File = {
+      extraCodeBody: String = "",
+      packageName: Option[String] = None): File = {
     val extendsText = Option(baseClass).map { c => s" extends ${c}" 
}.getOrElse("")
     val implementsText =
       "implements " + (implementsClasses :+ 
"java.io.Serializable").mkString(", ")
+    val packageText = packageName.map(p => s"package $p;\n").getOrElse("")
     val sourceFile = new JavaSourceFromString(className,
       s"""
+         |$packageText
          |public class $className $extendsText $implementsText {
          |  @Override public String toString() { return "$toStringValue"; }
          |
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index fe9bdef3d0e..4637a4a0179 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive
 
 import java.io.File
-import java.net.{URL, URLClassLoader}
+import java.net.URL
 import java.util.Locale
 import java.util.concurrent.TimeUnit
 
@@ -26,7 +26,6 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 import scala.util.Try
 
-import org.apache.commons.lang3.{JavaVersion, SystemUtils}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
@@ -46,7 +45,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf._
 import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
 import org.apache.spark.sql.types._
-import org.apache.spark.util.{ChildFirstURLClassLoader, Utils}
+import org.apache.spark.util.Utils
 
 
 private[spark] object HiveUtils extends Logging {
@@ -409,43 +408,15 @@ private[spark] object HiveUtils extends Logging {
             s"or change ${HIVE_METASTORE_VERSION.key} to $builtinHiveVersion.")
       }
 
-      // We recursively find all jars in the class loader chain,
-      // starting from the given classLoader.
-      def allJars(classLoader: ClassLoader): Array[URL] = classLoader match {
-        case null => Array.empty[URL]
-        case childFirst: ChildFirstURLClassLoader =>
-          childFirst.getURLs() ++ allJars(Utils.getSparkClassLoader)
-        case urlClassLoader: URLClassLoader =>
-          urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent)
-        case other => allJars(other.getParent)
-      }
-
-      val classLoader = Utils.getContextOrSparkClassLoader
-      val jars: Array[URL] = if 
(SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
-        // Do nothing. The system classloader is no longer a URLClassLoader in 
Java 9,
-        // so it won't match the case in allJars. It no longer exposes URLs of
-        // the system classpath
-        Array.empty[URL]
-      } else {
-        val loadedJars = allJars(classLoader)
-        // Verify at least one jar was found
-        if (loadedJars.length == 0) {
-          throw new IllegalArgumentException(
-            "Unable to locate hive jars to connect to metastore. " +
-              s"Please set ${HIVE_METASTORE_JARS.key}.")
-        }
-        loadedJars
-      }
-
       logInfo(
         s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion 
using Spark classes.")
       new IsolatedClientLoader(
         version = metaVersion,
         sparkConf = conf,
         hadoopConf = hadoopConf,
-        execJars = jars.toSeq,
         config = configurations,
-        isolationOn = !isCliSessionState(),
+        isolationOn = false,
+        sessionStateIsolationOverride = Some(!isCliSessionState()),
         barrierPrefixes = hiveMetastoreBarrierPrefixes,
         sharedPrefixes = hiveMetastoreSharedPrefixes)
     } else if (hiveMetastoreJars == "maven") {
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 0a83ec2689c..f76cc7f3a41 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -134,7 +134,7 @@ private[hive] class HiveClientImpl(
   // Create an internal session state for this HiveClientImpl.
   val state: SessionState = {
     val original = Thread.currentThread().getContextClassLoader
-    if (clientLoader.isolationOn) {
+    if (clientLoader.sessionStateIsolationOn) {
       // Switch to the initClassLoader.
       Thread.currentThread().setContextClassLoader(initClassLoader)
       try {
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index e65e6d42937..2756a2b18a9 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -180,6 +180,9 @@ private[hive] object IsolatedClientLoader extends Logging {
  * @param config   A set of options that will be added to the HiveConf of the 
constructed client.
  * @param isolationOn When true, custom versions of barrier classes will be 
constructed.  Must be
  *                    true unless loading the version of hive that is on 
Spark's classloader.
+ * @param sessionStateIsolationOverride If present, this parameter will 
specify the value of
+ *                                      `sessionStateIsolationOn`. If empty 
(the default), the
+ *                                      value of `isolationOn` will be used.
  * @param baseClassLoader The spark classloader that is used to load shared 
classes.
  */
 private[hive] class IsolatedClientLoader(
@@ -189,11 +192,19 @@ private[hive] class IsolatedClientLoader(
     val execJars: Seq[URL] = Seq.empty,
     val config: Map[String, String] = Map.empty,
     val isolationOn: Boolean = true,
+    sessionStateIsolationOverride: Option[Boolean] = None,
     val baseClassLoader: ClassLoader = 
Thread.currentThread().getContextClassLoader,
     val sharedPrefixes: Seq[String] = Seq.empty,
     val barrierPrefixes: Seq[String] = Seq.empty)
   extends Logging {
 
+  /**
+   * This controls whether the generated clients maintain an 
independent/isolated copy of the
+   * Hive `SessionState`. If false, the Hive will leverage the global/static 
copy of
+   * `SessionState`; if true, it will generate a new copy of the state 
internally.
+   */
+  val sessionStateIsolationOn: Boolean = 
sessionStateIsolationOverride.getOrElse(isolationOn)
+
   /** All jars used by the hive specific classloader. */
   protected def allJars = execJars.toArray
 
@@ -232,51 +243,46 @@ private[hive] class IsolatedClientLoader(
   private[hive] val classLoader: MutableURLClassLoader = {
     val isolatedClassLoader =
       if (isolationOn) {
-        if (allJars.isEmpty) {
-          // See HiveUtils; this is the Java 9+ + builtin mode scenario
-          baseClassLoader
-        } else {
-          val rootClassLoader: ClassLoader =
-            if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
-              // In Java 9, the boot classloader can see few JDK classes. The 
intended parent
-              // classloader for delegation is now the platform classloader.
-              // See http://java9.wtf/class-loading/
-              val platformCL =
-              classOf[ClassLoader].getMethod("getPlatformClassLoader").
-                invoke(null).asInstanceOf[ClassLoader]
-              // Check to make sure that the root classloader does not know 
about Hive.
-              
assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
-              platformCL
+        val rootClassLoader: ClassLoader =
+          if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
+            // In Java 9, the boot classloader can see few JDK classes. The 
intended parent
+            // classloader for delegation is now the platform classloader.
+            // See http://java9.wtf/class-loading/
+            val platformCL =
+            classOf[ClassLoader].getMethod("getPlatformClassLoader").
+              invoke(null).asInstanceOf[ClassLoader]
+            // Check to make sure that the root classloader does not know 
about Hive.
+            
assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
+            platformCL
+          } else {
+            // The boot classloader is represented by null (the instance 
itself isn't accessible)
+            // and before Java 9 can see all JDK classes
+            null
+          }
+        new URLClassLoader(allJars, rootClassLoader) {
+          override def loadClass(name: String, resolve: Boolean): Class[_] = {
+            val loaded = findLoadedClass(name)
+            if (loaded == null) doLoadClass(name, resolve) else loaded
+          }
+          def doLoadClass(name: String, resolve: Boolean): Class[_] = {
+            val classFileName = name.replaceAll("\\.", "/") + ".class"
+            if (isBarrierClass(name)) {
+              // For barrier classes, we construct a new copy of the class.
+              val bytes = 
IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
+              logDebug(s"custom defining: $name - 
${util.Arrays.hashCode(bytes)}")
+              defineClass(name, bytes, 0, bytes.length)
+            } else if (!isSharedClass(name)) {
+              logDebug(s"hive class: $name - 
${getResource(classToPath(name))}")
+              super.loadClass(name, resolve)
             } else {
-              // The boot classloader is represented by null (the instance 
itself isn't accessible)
-              // and before Java 9 can see all JDK classes
-              null
-            }
-          new URLClassLoader(allJars, rootClassLoader) {
-            override def loadClass(name: String, resolve: Boolean): Class[_] = 
{
-              val loaded = findLoadedClass(name)
-              if (loaded == null) doLoadClass(name, resolve) else loaded
-            }
-            def doLoadClass(name: String, resolve: Boolean): Class[_] = {
-              val classFileName = name.replaceAll("\\.", "/") + ".class"
-              if (isBarrierClass(name)) {
-                // For barrier classes, we construct a new copy of the class.
-                val bytes = 
IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
-                logDebug(s"custom defining: $name - 
${util.Arrays.hashCode(bytes)}")
-                defineClass(name, bytes, 0, bytes.length)
-              } else if (!isSharedClass(name)) {
-                logDebug(s"hive class: $name - 
${getResource(classToPath(name))}")
-                super.loadClass(name, resolve)
-              } else {
-                // For shared classes, we delegate to baseClassLoader, but 
fall back in case the
-                // class is not found.
-                logDebug(s"shared class: $name")
-                try {
-                  baseClassLoader.loadClass(name)
-                } catch {
-                  case _: ClassNotFoundException =>
-                    super.loadClass(name, resolve)
-                }
+              // For shared classes, we delegate to baseClassLoader, but fall 
back in case the
+              // class is not found.
+              logDebug(s"shared class: $name")
+              try {
+                baseClassLoader.loadClass(name)
+              } catch {
+                case _: ClassNotFoundException =>
+                  super.loadClass(name, resolve)
               }
             }
           }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala
index d8e1e012928..823ac8ed957 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala
@@ -17,15 +17,19 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.File
+import java.net.URI
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, TestUtils}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.util.ChildFirstURLClassLoader
+import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader}
 
 class HiveUtilsSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
 
@@ -77,6 +81,32 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton
     }
   }
 
+  test("SPARK-42539: User-provided JARs should not take precedence over 
builtin Hive JARs") {
+    withTempDir { tmpDir =>
+        val classFile = TestUtils.createCompiledClass(
+          "Hive", tmpDir, packageName = 
Some("org.apache.hadoop.hive.ql.metadata"))
+
+      val jarFile = new File(tmpDir, "hive-fake.jar")
+      TestUtils.createJar(Seq(classFile), jarFile, 
Some("org/apache/hadoop/hive/ql/metadata"))
+
+      val conf = new SparkConf
+      val contextClassLoader = Thread.currentThread().getContextClassLoader
+      val loader = new MutableURLClassLoader(Array(jarFile.toURI.toURL), 
contextClassLoader)
+      try {
+        Thread.currentThread().setContextClassLoader(loader)
+        val client = HiveUtils.newClientForMetadata(
+          conf,
+          SparkHadoopUtil.newConfiguration(conf),
+          HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true))
+        client.createDatabase(
+          CatalogDatabase("foo", "", 
URI.create(s"file://${tmpDir.getAbsolutePath}/foo.db"), Map()),
+          ignoreIfExists = true)
+      } finally {
+        Thread.currentThread().setContextClassLoader(contextClassLoader)
+      }
+    }
+  }
+
   test("SPARK-27349: Dealing with TimeVars removed in Hive 2.x") {
     // Test default value
     val defaultConf = new Configuration


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to