This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 5a0bc96d5f5e [SPARK-46732][CONNECT][3.5] Make Subquery/Broadcast 
thread work with Connect's artifact management
5a0bc96d5f5e is described below

commit 5a0bc96d5f5e42fa5e6ea9d024da572343f239a9
Author: xieshuaihu <xieshua...@agora.io>
AuthorDate: Wed Jan 17 15:24:58 2024 +0900

    [SPARK-46732][CONNECT][3.5] Make Subquery/Broadcast thread work with 
Connect's artifact management
    
    ### What changes were proposed in this pull request?
    
    Similar with SPARK-44794, propagate JobArtifactState to broadcast/subquery 
thread.
    
    This is an example:
    
    ```scala
    val add1 = udf((i: Long) => i + 1)
    val tableA = spark.range(2).alias("a")
    val tableB = 
broadcast(spark.range(2).select(add1(col("id")).alias("id"))).alias("b")
    tableA.join(tableB).
      where(col("a.id")===col("b.id")).
      select(col("a.id").alias("a_id"), col("b.id").alias("b_id")).
      collect().
      mkString("[", ", ", "]")
    ```
    
    Before this pr, this example will throw exception `ClassNotFoundException`. 
Subquery and Broadcast execution use a separate ThreadPool which don't have the 
`JobArtifactState`.
    
    ### Why are the changes needed?
    Fix bug. Make Subquery/Broadcast thread work with Connect's artifact 
management.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add a new test to `ReplE2ESuite`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #44763 from xieshuaihu/SPARK-46732backport.
    
    Authored-by: xieshuaihu <xieshua...@agora.io>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../org/apache/spark/sql/application/ReplE2ESuite.scala  | 16 ++++++++++++++++
 .../org/apache/spark/sql/execution/SQLExecution.scala    |  5 +++--
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
index 5bb8cbf3543b..9d61b4d56e1e 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
@@ -362,4 +362,20 @@ class ReplE2ESuite extends RemoteSparkSession with 
BeforeAndAfterEach {
     val output = runCommandsInShell(input)
     assertContains("noException: Boolean = true", output)
   }
+
+  test("broadcast works with REPL generated code") {
+    val input =
+      """
+        |val add1 = udf((i: Long) => i + 1)
+        |val tableA = spark.range(2).alias("a")
+        |val tableB = 
broadcast(spark.range(2).select(add1(col("id")).alias("id"))).alias("b")
+        |tableA.join(tableB).
+        |  where(col("a.id")===col("b.id")).
+        |  select(col("a.id").alias("a_id"), col("b.id").alias("b_id")).
+        |  collect().
+        |  mkString("[", ", ", "]")
+        |""".stripMargin
+    val output = runCommandsInShell(input)
+    assertContains("""String = "[[1,1]]"""", output)
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index daeac699c279..b4cbb6135223 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
 import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future => 
JFuture}
 import java.util.concurrent.atomic.AtomicLong
 
-import org.apache.spark.{ErrorMessageFormat, SparkContext, SparkThrowable, 
SparkThrowableHelper}
+import org.apache.spark.{ErrorMessageFormat, JobArtifactSet, SparkContext, 
SparkThrowable, SparkThrowableHelper}
 import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, 
SPARK_EXECUTOR_PREFIX}
 import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.sql.SparkSession
@@ -215,7 +215,8 @@ object SQLExecution {
     val activeSession = sparkSession
     val sc = sparkSession.sparkContext
     val localProps = Utils.cloneProperties(sc.getLocalProperties)
-    exec.submit(() => {
+    val artifactState = JobArtifactSet.getCurrentJobArtifactState.orNull
+    exec.submit(() => JobArtifactSet.withActiveJobArtifactState(artifactState) 
{
       val originalSession = SparkSession.getActiveSession
       val originalLocalProps = sc.getLocalProperties
       SparkSession.setActiveSession(activeSession)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to