cloud-fan commented on code in PR #55977:
URL: https://github.com/apache/spark/pull/55977#discussion_r3266732184


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala:
##########
@@ -265,6 +275,42 @@ class CatalogManager(
       currentCatalog, currentNamespace,
       currentCatalog, currentNamespace)
 
+  /**
+   * Snapshot the live PATH-derived [[SessionCatalog.SessionFunctionKind]] 
order used by
+   * unqualified function/table-function resolution.
+   *
+   * The kinds list is computed by reading the current catalog, current 
namespace, and the

Review Comment:
   Minor: this docstring states the (catalog, namespace, path) triple is 
observed atomically, but `v1CurrentDb` (read at L299) is intentionally outside 
the lock. The kinds extraction only consumes `system.builtin` / 
`system.session` entries, so a torn v1 read can't affect the result — but 
readers tracing this code may not see why the staleness is harmless. One extra 
clause acknowledging that v1's contribution to `effectiveNs` is intentionally 
racy (and explaining why it doesn't matter for the kinds list) would head off 
confusion.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala:
##########
@@ -330,15 +376,32 @@ class CatalogManager(
     
catalog(_currentCatalogName.getOrElse(conf.getConf(SQLConf.DEFAULT_CATALOG)))
   }
 
-  def setCurrentCatalog(catalogName: String): Unit = synchronized {
-    // `setCurrentCatalog` is noop if it doesn't switch to a different catalog.
-    if (currentCatalog.name() != catalogName) {
-      catalog(catalogName)
-      _currentCatalogName = Some(catalogName)
-      _currentNamespace = None
+  def setCurrentCatalog(catalogName: String): Unit = {
+    // SPARK-56939: see [[setCurrentNamespace]]. Avoid nesting 
[[CatalogManager]]'s lock
+    // across [[v1SessionCatalog.setCurrentDatabase]] (which synchronizes on
+    // [[SessionCatalog]]) to prevent a lock-order inversion with concurrent 
unqualified
+    // function resolution.
+    val needsSwitch = synchronized {
+      // `setCurrentCatalog` is noop if it doesn't switch to a different 
catalog.
+      if (currentCatalog.name() != catalogName) {
+        // Force-load the named catalog while holding the manager lock to keep 
the
+        // not-found error semantics; if loading fails, throw before mutating 
state.
+        catalog(catalogName)
+        true
+      } else {
+        false
+      }
+    }
+    if (needsSwitch) {
       // Reset the current database of v1 `SessionCatalog` when switching 
current catalog, so that
       // when we switch back to session catalog, the current namespace 
definitely is ["default"].
+      // Run this BEFORE publishing the new catalog name so that if a reader 
observes the new

Review Comment:
   This justification only tells half the story. "If a reader observes the new 
catalog, the v1 state is already consistent with it" is true, but the symmetric 
case is now allowed: while the new name is *not yet* published, a concurrent 
reader sees `(oldCatalog, v1.currentDb=default)` — a torn state the pre-PR 
atomic version forbade. If the old catalog was the session catalog (the common 
case), the user's previous namespace is briefly invisible to that reader. Worth 
mentioning the trade-off so a future maintainer doesn't read this comment and 
conclude the new ordering is strictly better.



##########
sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala:
##########
@@ -977,6 +977,83 @@ class SetPathSuite extends SharedSparkSession {
     }
   }
 
+  test("SPARK-56939: concurrent USE SCHEMA and unqualified function lookups do 
not deadlock") {
+    // Regression for SPARK-56939. Prior to the fix, 
[[CatalogManager.setCurrentNamespace]]
+    // held the manager's intrinsic lock while calling into
+    // [[SessionCatalog.setCurrentDatabaseWithNameCheck]] (which takes the 
catalog's
+    // intrinsic lock), while concurrent unqualified function resolution 
acquired the
+    // catalog's intrinsic lock and then reached back into the manager via
+    // [[CatalogManager.sqlResolutionPathEntries]]. That lock-order inversion 
deadlocked the
+    // session whenever a `USE SCHEMA` raced with any unqualified function 
reference.
+    //
+    // The hazard is independent of [[SQLConf.PATH_ENABLED]] and the
+    // resolution-order setting, so this test exercises the default 
configuration.
+    sql("CREATE SCHEMA IF NOT EXISTS spark_56939_s1")
+    sql("CREATE SCHEMA IF NOT EXISTS spark_56939_s2")
+    try {
+      val budget = 200
+      val iterations = new java.util.concurrent.atomic.AtomicInteger(0)
+      val barrier = new java.util.concurrent.CyclicBarrier(2)
+      val errors = new java.util.concurrent.ConcurrentLinkedQueue[Throwable]()
+
+      val useThread = new Thread(() => {
+        try {
+          barrier.await()
+          var i = 0
+          while (i < budget && errors.isEmpty) {
+            sql(if ((i % 2) == 0) "USE SCHEMA spark_56939_s1" else "USE SCHEMA 
spark_56939_s2")

Review Comment:
   Both `setCurrentNamespace` and `setCurrentCatalog` were rewritten with the 
same split-lock pattern, but only `USE SCHEMA` is exercised here. A symmetric 
thread that toggles `USE CATALOG` between `spark_catalog` and a registered v2 
catalog would close coverage of the `setCurrentCatalog` arm and guard against 
asymmetric regressions in one path or the other.



##########
sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala:
##########
@@ -977,6 +977,83 @@ class SetPathSuite extends SharedSparkSession {
     }
   }
 
+  test("SPARK-56939: concurrent USE SCHEMA and unqualified function lookups do 
not deadlock") {
+    // Regression for SPARK-56939. Prior to the fix, 
[[CatalogManager.setCurrentNamespace]]
+    // held the manager's intrinsic lock while calling into
+    // [[SessionCatalog.setCurrentDatabaseWithNameCheck]] (which takes the 
catalog's
+    // intrinsic lock), while concurrent unqualified function resolution 
acquired the
+    // catalog's intrinsic lock and then reached back into the manager via
+    // [[CatalogManager.sqlResolutionPathEntries]]. That lock-order inversion 
deadlocked the
+    // session whenever a `USE SCHEMA` raced with any unqualified function 
reference.
+    //
+    // The hazard is independent of [[SQLConf.PATH_ENABLED]] and the
+    // resolution-order setting, so this test exercises the default 
configuration.
+    sql("CREATE SCHEMA IF NOT EXISTS spark_56939_s1")
+    sql("CREATE SCHEMA IF NOT EXISTS spark_56939_s2")
+    try {
+      val budget = 200
+      val iterations = new java.util.concurrent.atomic.AtomicInteger(0)
+      val barrier = new java.util.concurrent.CyclicBarrier(2)
+      val errors = new java.util.concurrent.ConcurrentLinkedQueue[Throwable]()
+
+      val useThread = new Thread(() => {
+        try {
+          barrier.await()
+          var i = 0
+          while (i < budget && errors.isEmpty) {
+            sql(if ((i % 2) == 0) "USE SCHEMA spark_56939_s1" else "USE SCHEMA 
spark_56939_s2")
+            i += 1
+          }
+        } catch {
+          case t: Throwable => errors.add(t)
+        }
+      }, "SPARK-56939-use-schema")
+
+      val lookupThread = new Thread(() => {
+        try {
+          barrier.await()
+          var i = 0
+          while (i < budget && errors.isEmpty) {
+            // Unqualified `count(*)` exercises the kinds-order provider that 
resolves
+            // against the live PATH via [[CatalogManager]] -- the side of the 
cycle
+            // that previously acquired the catalog lock first and then the 
manager lock.
+            val n = sql("SELECT count(*) FROM VALUES (1), (2), (3) AS t(a)")
+              .head().getLong(0)
+            assert(n == 3L, s"unexpected count: $n at iteration $i")
+            iterations.incrementAndGet()
+            i += 1
+          }
+        } catch {
+          case t: Throwable => errors.add(t)
+        }
+      }, "SPARK-56939-lookup")
+
+      useThread.start()
+      lookupThread.start()
+
+      // Generous join: 30s is plenty for 200 cheap queries on either side and 
gives a
+      // clear failure signal if the implementation regresses into a deadlock.
+      val joinMillis = 30000L
+      useThread.join(joinMillis)
+      lookupThread.join(joinMillis)
+
+      assert(!useThread.isAlive,
+        "USE SCHEMA thread did not finish; lock-order inversion between 
SessionCatalog and " +
+          "CatalogManager likely returned (SPARK-56939).")

Review Comment:
   Minor wording: "likely returned" reads ambiguously (it can be parsed 
transitively). Suggest "likely regressed" — and the same change at L1045.
   ```suggestion
             "CatalogManager likely regressed (SPARK-56939).")
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala:
##########
@@ -977,6 +977,83 @@ class SetPathSuite extends SharedSparkSession {
     }
   }
 
+  test("SPARK-56939: concurrent USE SCHEMA and unqualified function lookups do 
not deadlock") {
+    // Regression for SPARK-56939. Prior to the fix, 
[[CatalogManager.setCurrentNamespace]]
+    // held the manager's intrinsic lock while calling into
+    // [[SessionCatalog.setCurrentDatabaseWithNameCheck]] (which takes the 
catalog's
+    // intrinsic lock), while concurrent unqualified function resolution 
acquired the
+    // catalog's intrinsic lock and then reached back into the manager via
+    // [[CatalogManager.sqlResolutionPathEntries]]. That lock-order inversion 
deadlocked the
+    // session whenever a `USE SCHEMA` raced with any unqualified function 
reference.
+    //
+    // The hazard is independent of [[SQLConf.PATH_ENABLED]] and the
+    // resolution-order setting, so this test exercises the default 
configuration.
+    sql("CREATE SCHEMA IF NOT EXISTS spark_56939_s1")
+    sql("CREATE SCHEMA IF NOT EXISTS spark_56939_s2")
+    try {
+      val budget = 200
+      val iterations = new java.util.concurrent.atomic.AtomicInteger(0)
+      val barrier = new java.util.concurrent.CyclicBarrier(2)
+      val errors = new java.util.concurrent.ConcurrentLinkedQueue[Throwable]()
+
+      val useThread = new Thread(() => {
+        try {
+          barrier.await()
+          var i = 0
+          while (i < budget && errors.isEmpty) {
+            sql(if ((i % 2) == 0) "USE SCHEMA spark_56939_s1" else "USE SCHEMA 
spark_56939_s2")
+            i += 1
+          }
+        } catch {
+          case t: Throwable => errors.add(t)
+        }
+      }, "SPARK-56939-use-schema")
+
+      val lookupThread = new Thread(() => {
+        try {
+          barrier.await()
+          var i = 0
+          while (i < budget && errors.isEmpty) {
+            // Unqualified `count(*)` exercises the kinds-order provider that 
resolves
+            // against the live PATH via [[CatalogManager]] -- the side of the 
cycle
+            // that previously acquired the catalog lock first and then the 
manager lock.
+            val n = sql("SELECT count(*) FROM VALUES (1), (2), (3) AS t(a)")
+              .head().getLong(0)
+            assert(n == 3L, s"unexpected count: $n at iteration $i")
+            iterations.incrementAndGet()
+            i += 1
+          }
+        } catch {
+          case t: Throwable => errors.add(t)
+        }
+      }, "SPARK-56939-lookup")
+
+      useThread.start()
+      lookupThread.start()
+
+      // Generous join: 30s is plenty for 200 cheap queries on either side and 
gives a
+      // clear failure signal if the implementation regresses into a deadlock.
+      val joinMillis = 30000L
+      useThread.join(joinMillis)
+      lookupThread.join(joinMillis)
+
+      assert(!useThread.isAlive,
+        "USE SCHEMA thread did not finish; lock-order inversion between 
SessionCatalog and " +
+          "CatalogManager likely returned (SPARK-56939).")
+      assert(!lookupThread.isAlive,
+        "Lookup thread did not finish; lock-order inversion between 
SessionCatalog and " +
+          "CatalogManager likely returned (SPARK-56939).")

Review Comment:
   Same as above:
   ```suggestion
             "CatalogManager likely regressed (SPARK-56939).")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to